/*
 * Decompiled with CFR 0.152.
 */
package org.nd4j.parameterserver.distributed.transport;

import io.aeron.Aeron;
import io.aeron.FragmentAssembler;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.NonNull;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.SleepingIdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.nd4j.linalg.exception.ND4JIllegalStateException;
import org.nd4j.linalg.factory.Nd4j;
import org.nd4j.parameterserver.distributed.conf.VoidConfiguration;
import org.nd4j.parameterserver.distributed.enums.NodeRole;
import org.nd4j.parameterserver.distributed.logic.completion.Clipboard;
import org.nd4j.parameterserver.distributed.messages.Frame;
import org.nd4j.parameterserver.distributed.messages.MeaningfulMessage;
import org.nd4j.parameterserver.distributed.messages.VoidMessage;
import org.nd4j.parameterserver.distributed.transport.Transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
public abstract class BaseTransport
implements Transport {
    private static final Logger log = LoggerFactory.getLogger(BaseTransport.class);
    protected VoidConfiguration voidConfiguration;
    protected NodeRole nodeRole;
    protected Aeron aeron;
    protected Aeron.Context context;
    protected String unicastChannelUri;
    protected String ip;
    protected int port = 0;
    protected MediaDriver driver;
    protected Publication publicationForShards;
    protected Publication publicationForClients;
    protected Subscription subscriptionForShards;
    protected Subscription subscriptionForClients;
    protected FragmentAssembler messageHandlerForShards;
    protected FragmentAssembler messageHandlerForClients;
    protected LinkedBlockingQueue<VoidMessage> messages = new LinkedBlockingQueue();
    protected Map<Long, MeaningfulMessage> completed = new ConcurrentHashMap<Long, MeaningfulMessage>();
    protected AtomicBoolean runner = new AtomicBoolean(true);
    protected Thread threadA;
    protected Thread threadB;
    protected Clipboard clipboard;
    protected AtomicLong frameCount = new AtomicLong(0L);
    protected IdleStrategy idler = new SleepingIdleStrategy(1000L);
    protected IdleStrategy feedbackIdler = new SleepingIdleStrategy(100000L);
    protected Transport.ThreadingModel threadingModel = Transport.ThreadingModel.DEDICATED_THREADS;
    protected long originatorId;
    protected short targetIndex = 0;
    protected short shardIndex = 0;

    @Override
    public long getOwnOriginatorId() {
        return this.originatorId;
    }

    @Override
    public MeaningfulMessage sendMessageAndGetResponse(@NonNull VoidMessage message) {
        MeaningfulMessage msg;
        if (message == null) {
            throw new NullPointerException("message is marked @NonNull but is null");
        }
        long startTime = System.currentTimeMillis();
        long taskId = message.getTaskId();
        this.sendCommandToShard(message);
        AtomicLong cnt = new AtomicLong(0L);
        long currentTime = System.currentTimeMillis();
        while ((msg = this.completed.get(taskId)) == null) {
            try {
                this.feedbackIdler.idle();
                if (System.currentTimeMillis() - currentTime <= this.voidConfiguration.getResponseTimeout()) continue;
                log.info("Resending request for taskId [{}]", (Object)taskId);
                message.incrementRetransmitCount();
                if (message.getRetransmitCount() > 20) {
                    throw new RuntimeException("Giving up on message delivery...");
                }
                return this.sendMessageAndGetResponse(message);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        this.completed.remove(taskId);
        long endTime = System.currentTimeMillis();
        long timeSpent = endTime - startTime;
        if (message instanceof Frame && this.frameCount.incrementAndGet() % 1000L == 0L) {
            log.info("Frame of {} messages [{}] processed in {} ms", new Object[]{((Frame)message).size(), message.getTaskId(), timeSpent});
        }
        return msg;
    }

    @Override
    public void setIpAndPort(@NonNull String ip, int port) {
        if (ip == null) {
            throw new NullPointerException("ip is marked @NonNull but is null");
        }
        this.ip = ip;
        this.port = port;
    }

    @Override
    public void sendMessage(@NonNull VoidMessage message) {
        if (message == null) {
            throw new NullPointerException("message is marked @NonNull but is null");
        }
        switch (message.getMessageType()) {
            case 0: 
            case 1: 
            case 2: 
            case 3: 
            case 4: 
            case 5: 
            case 6: 
            case 7: 
            case 8: 
            case 9: {
                if (message.isBlockingMessage()) {
                    this.sendMessageAndGetResponse(message);
                    break;
                }
                this.sendCommandToShard(message);
                break;
            }
            case 10: 
            case 11: 
            case 12: 
            case 13: 
            case 19: {
                this.sendFeedbackToClient(message);
                break;
            }
            case 20: 
            case 21: 
            case 22: 
            case 28: {
                this.sendCoordinationCommand(message);
                break;
            }
            default: {
                throw new RuntimeException("Unknown messageType passed for delivery");
            }
        }
    }

    protected void shardMessageHandler(DirectBuffer buffer, int offset, int length, Header header) {
        byte[] data = new byte[length];
        buffer.getBytes(offset, data);
        Object message = VoidMessage.fromBytes(data);
        if (message.getMessageType() == 7) {
            this.messages.add((VoidMessage)message);
        } else {
            this.publicationForShards.offer(buffer, offset, length);
        }
    }

    protected void internalMessageHandler(DirectBuffer buffer, int offset, int length, Header header) {
        byte[] data = new byte[length];
        buffer.getBytes(offset, data);
        Object message = VoidMessage.fromBytes(data);
        this.messages.add((VoidMessage)message);
    }

    protected void clientMessageHandler(DirectBuffer buffer, int offset, int length, Header header) {
        byte[] data = new byte[length];
        buffer.getBytes(offset, data);
        MeaningfulMessage message = (MeaningfulMessage)VoidMessage.fromBytes(data);
        this.completed.put(message.getTaskId(), message);
    }

    @Override
    public void sendMessageToAllShards(VoidMessage message) {
        message.setTargetId((short)-1);
        this.sendCoordinationCommand(message);
    }

    @Override
    public void init(VoidConfiguration voidConfiguration, Clipboard clipboard, NodeRole role, String localIp, int localPort, short shardIndex) {
    }

    @Override
    public void launch(@NonNull Transport.ThreadingModel threading) {
        if (threading == null) {
            throw new NullPointerException("threading is marked @NonNull but is null");
        }
        this.threadingModel = threading;
        switch (threading) {
            case SINGLE_THREAD: {
                log.warn("SINGLE_THREAD model is used, performance will be significantly reduced");
                this.threadA = new Thread(() -> {
                    while (this.runner.get()) {
                        if (this.subscriptionForShards != null) {
                            this.subscriptionForShards.poll((FragmentHandler)this.messageHandlerForShards, 512);
                        }
                        this.idler.idle(this.subscriptionForClients.poll((FragmentHandler)this.messageHandlerForClients, 512));
                    }
                });
                this.threadA.setDaemon(true);
                this.threadA.start();
                break;
            }
            case DEDICATED_THREADS: {
                AtomicBoolean localRunner = new AtomicBoolean(false);
                if (this.nodeRole == NodeRole.NONE) {
                    throw new ND4JIllegalStateException("No role is set for current node!");
                }
                if (this.nodeRole == NodeRole.SHARD || this.nodeRole == NodeRole.BACKUP || this.nodeRole == NodeRole.MASTER) {
                    if (this.messageHandlerForShards != null) {
                        this.threadB = new Thread(() -> {
                            while (this.runner.get()) {
                                this.idler.idle(this.subscriptionForShards.poll((FragmentHandler)this.messageHandlerForShards, 512));
                            }
                        });
                        this.threadB.setDaemon(true);
                        this.threadB.setName("VoidParamServer subscription threadB [" + (Object)((Object)this.nodeRole) + "]");
                    }
                    this.threadA = new Thread(() -> {
                        localRunner.set(true);
                        while (this.runner.get()) {
                            this.idler.idle(this.subscriptionForClients.poll((FragmentHandler)this.messageHandlerForClients, 512));
                        }
                    });
                    if (this.threadB != null) {
                        Nd4j.getAffinityManager().attachThreadToDevice(this.threadB, Nd4j.getAffinityManager().getDeviceForCurrentThread());
                        this.threadB.setDaemon(true);
                        this.threadB.setName("VoidParamServer subscription threadB [" + (Object)((Object)this.nodeRole) + "]");
                        this.threadB.start();
                    }
                } else {
                    this.threadA = new Thread(() -> {
                        localRunner.set(true);
                        while (this.runner.get()) {
                            this.idler.idle(this.subscriptionForClients.poll((FragmentHandler)this.messageHandlerForClients, 512));
                        }
                    });
                }
                Nd4j.getAffinityManager().attachThreadToDevice(this.threadA, Nd4j.getAffinityManager().getDeviceForCurrentThread());
                this.threadA.setDaemon(true);
                this.threadA.setName("VoidParamServer subscription threadA [" + (Object)((Object)this.nodeRole) + "]");
                this.threadA.start();
                while (!localRunner.get()) {
                    try {
                        Thread.sleep(50L);
                    }
                    catch (Exception exception) {}
                }
                break;
            }
            case SAME_THREAD: {
                log.warn("SAME_THREAD model is used, performance will be dramatically reduced");
                break;
            }
            default: {
                throw new IllegalStateException("Unknown thread model: [" + threading.toString() + "]");
            }
        }
    }

    protected void shutdownSilent() {
        log.info("Shutting down Aeron infrastructure...");
        CloseHelper.quietClose((AutoCloseable)this.publicationForClients);
        CloseHelper.quietClose((AutoCloseable)this.publicationForShards);
        CloseHelper.quietClose((AutoCloseable)this.subscriptionForShards);
        CloseHelper.quietClose((AutoCloseable)this.subscriptionForClients);
        CloseHelper.quietClose((AutoCloseable)this.aeron);
        CloseHelper.quietClose((AutoCloseable)this.context);
        CloseHelper.quietClose((AutoCloseable)this.driver);
    }

    @Override
    public void shutdown() {
        this.runner.set(false);
        try {
            this.threadA.join();
            if (this.threadB != null) {
                this.threadB.join();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        CloseHelper.quietClose((AutoCloseable)this.driver);
        try {
            Thread.sleep(500L);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public void receiveMessage(VoidMessage message) {
        try {
            log.info("Message received, saving...");
            this.messages.put(message);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public VoidMessage takeMessage() {
        if (this.threadingModel != Transport.ThreadingModel.SAME_THREAD) {
            try {
                return this.messages.take();
            }
            catch (InterruptedException e) {
                return null;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        if (this.subscriptionForShards != null) {
            this.subscriptionForShards.poll((FragmentHandler)this.messageHandlerForShards, 512);
        }
        this.subscriptionForClients.poll((FragmentHandler)this.messageHandlerForClients, 512);
        return this.messages.poll();
    }

    @Override
    public void putMessage(@NonNull VoidMessage message) {
        if (message == null) {
            throw new NullPointerException("message is marked @NonNull but is null");
        }
        this.messages.add(message);
    }

    @Override
    public VoidMessage peekMessage() {
        return this.messages.peek();
    }

    protected synchronized void sendCommandToShard(VoidMessage message) {
        if (this.nodeRole == NodeRole.SHARD) {
            message.setTargetId(this.shardIndex);
            this.messages.add(message);
            return;
        }
        message.setTargetId(this.targetIndex);
        UnsafeBuffer buffer = message.asUnsafeBuffer();
        long result = this.publicationForShards.offer((DirectBuffer)buffer);
        if (result < 0L) {
            for (int i = 0; i < 5 && result < 0L; ++i) {
                try {
                    Thread.sleep(1000L);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                result = this.publicationForShards.offer((DirectBuffer)buffer);
            }
        }
        if (result < 0L) {
            throw new RuntimeException("Unable to send message over the wire. Error code: " + result);
        }
    }

    protected abstract void sendCoordinationCommand(VoidMessage var1);

    protected abstract void sendFeedbackToClient(VoidMessage var1);

    @Override
    public void addClient(String ip, int port) {
    }

    @Override
    public String getIp() {
        return this.ip;
    }

    @Override
    public int getPort() {
        return this.port;
    }

    @Override
    public int numberOfKnownClients() {
        return 0;
    }

    @Override
    public int numberOfKnownShards() {
        return 0;
    }

    @Override
    public void addShard(String ip, int port) {
    }

    @Override
    public void sendMessageToAllClients(VoidMessage message, Long ... exclusions) {
    }

    @Override
    public short getTargetIndex() {
        return this.targetIndex;
    }

    @Override
    public short getShardIndex() {
        return this.shardIndex;
    }
}

