/*
 * Decompiled with CFR 0.152.
 */
package org.nd4j.parameterserver.distributed.v2.transport.impl;

import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.jetbrains.annotations.NotNull;
import org.nd4j.linalg.exception.ND4JIllegalStateException;
import org.nd4j.linalg.factory.Nd4j;
import org.nd4j.linalg.primitives.Atomic;
import org.nd4j.linalg.primitives.AtomicBoolean;
import org.nd4j.linalg.primitives.Optional;
import org.nd4j.parameterserver.distributed.conf.VoidConfiguration;
import org.nd4j.parameterserver.distributed.v2.chunks.VoidChunk;
import org.nd4j.parameterserver.distributed.v2.enums.MeshBuildMode;
import org.nd4j.parameterserver.distributed.v2.enums.PropagationMode;
import org.nd4j.parameterserver.distributed.v2.messages.BroadcastableMessage;
import org.nd4j.parameterserver.distributed.v2.messages.INDArrayMessage;
import org.nd4j.parameterserver.distributed.v2.messages.MessagesHistoryHolder;
import org.nd4j.parameterserver.distributed.v2.messages.RequestMessage;
import org.nd4j.parameterserver.distributed.v2.messages.ResponseMessage;
import org.nd4j.parameterserver.distributed.v2.messages.VoidMessage;
import org.nd4j.parameterserver.distributed.v2.messages.history.HashHistoryHolder;
import org.nd4j.parameterserver.distributed.v2.messages.impl.MeshUpdateMessage;
import org.nd4j.parameterserver.distributed.v2.messages.pairs.handshake.HandshakeRequest;
import org.nd4j.parameterserver.distributed.v2.messages.pairs.handshake.HandshakeResponse;
import org.nd4j.parameterserver.distributed.v2.messages.pairs.ping.PingMessage;
import org.nd4j.parameterserver.distributed.v2.messages.pairs.ping.PongMessage;
import org.nd4j.parameterserver.distributed.v2.transport.RestartCallback;
import org.nd4j.parameterserver.distributed.v2.transport.Transport;
import org.nd4j.parameterserver.distributed.v2.util.MeshOrganizer;
import org.nd4j.parameterserver.distributed.v2.util.MessageSplitter;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseTransport
implements Transport {
    private static final Logger log = LoggerFactory.getLogger(BaseTransport.class);
    protected final MessageFlow<VoidMessage> outgoingFlow = new MessageFlow();
    protected final MessageFlow<INDArrayMessage> incomingFlow = new MessageFlow();
    protected final Atomic<MeshOrganizer> mesh = new Atomic();
    protected String id;
    protected String rootId;
    protected boolean masterMode = false;
    protected final Map<String, ResponseMessage> replies = new ConcurrentHashMap<String, ResponseMessage>();
    protected RestartCallback restartCallback;
    protected Map<String, Consumer> consumers = new HashMap<String, Consumer>();
    protected final VoidConfiguration voidConfiguration;
    protected final MeshBuildMode meshBuildMode = MeshBuildMode.MESH;
    protected final AtomicInteger numerOfNodes = new AtomicInteger(0);
    protected final TransferQueue<VoidMessage> messageQueue = new LinkedTransferQueue<VoidMessage>();
    protected MessageSplitter splitter;
    protected MessagesHistoryHolder<String> historyHolder = new HashHistoryHolder<String>(2048);
    protected AtomicBoolean handshakeFlag = new AtomicBoolean(false);
    protected final ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(Math.max(2, Runtime.getRuntime().availableProcessors()), new ThreadFactory(){

        @Override
        public Thread newThread(@NotNull Runnable r) {
            Thread t = Executors.defaultThreadFactory().newThread(r);
            t.setDaemon(true);
            Nd4j.getAffinityManager().attachThreadToDevice(t, Integer.valueOf(0));
            return t;
        }
    });

    protected BaseTransport() {
        this(UUID.randomUUID().toString());
    }

    protected BaseTransport(@NonNull String rootId) {
        this(rootId, VoidConfiguration.builder().build());
        if (rootId == null) {
            throw new NullPointerException("rootId is marked @NonNull but is null");
        }
    }

    protected BaseTransport(@NonNull String rootId, @NonNull VoidConfiguration voidConfiguration) {
        if (rootId == null) {
            throw new NullPointerException("rootId is marked @NonNull but is null");
        }
        if (voidConfiguration == null) {
            throw new NullPointerException("voidConfiguration is marked @NonNull but is null");
        }
        this.mesh.set((Serializable)new MeshOrganizer(voidConfiguration.getMeshBuildMode()));
        this.rootId = rootId;
        this.voidConfiguration = voidConfiguration;
    }

    protected BaseTransport(@NonNull String ownId, @NonNull String rootId, @NonNull VoidConfiguration voidConfiguration) {
        if (ownId == null) {
            throw new NullPointerException("ownId is marked @NonNull but is null");
        }
        if (rootId == null) {
            throw new NullPointerException("rootId is marked @NonNull but is null");
        }
        if (voidConfiguration == null) {
            throw new NullPointerException("voidConfiguration is marked @NonNull but is null");
        }
        this.mesh.set((Serializable)new MeshOrganizer(voidConfiguration.getMeshBuildMode()));
        this.id = ownId;
        this.rootId = rootId;
        this.voidConfiguration = voidConfiguration;
        this.masterMode = ownId.equalsIgnoreCase(rootId);
        if (this.masterMode) {
            ((MeshOrganizer)this.mesh.get()).getRootNode().setId(rootId);
        }
    }

    @Override
    public Consumer<VoidMessage> outgoingConsumer() {
        return this.outgoingFlow;
    }

    @Override
    public Publisher<INDArrayMessage> incomingPublisher() {
        return this.incomingFlow;
    }

    @Override
    public String getUpstreamId() {
        if (((MeshOrganizer)this.mesh.get()).getRootNode().getId().equals(this.id())) {
            return this.id();
        }
        return ((MeshOrganizer)this.mesh.get()).getNodeById(this.id()).getUpstreamNode().getId();
    }

    @Override
    public synchronized void launch() {
        int lim = this.masterMode ? 1 : 0;
        for (int e = 0; e < this.executorService.getMaximumPoolSize() - lim; ++e) {
            this.executorService.submit(new Runnable(){

                @Override
                public void run() {
                    while (true) {
                        try {
                            while (true) {
                                VoidMessage message;
                                if ((message = (VoidMessage)BaseTransport.this.messageQueue.take()) == null) {
                                    continue;
                                }
                                BaseTransport.this.internalProcessMessage(message);
                            }
                        }
                        catch (InterruptedException e) {
                        }
                        catch (Exception e) {
                            log.error("Exception: {}", (Throwable)e);
                            continue;
                        }
                        break;
                    }
                }
            });
        }
        Disposable d = Flowable.fromPublisher(this.outgoingFlow).subscribe(voidMessage -> {
            if (this.mesh.get() == null) {
                log.warn("Mesh wasn't received yet!");
                return;
            }
            voidMessage.setOriginatorId(this.id);
            this.propagateMessage((VoidMessage)voidMessage, PropagationMode.BOTH_WAYS);
        });
        if (!this.masterMode) {
            try {
                this.sendMessageBlocking(new HandshakeRequest(), this.rootId);
            }
            catch (Exception e) {
                throw new ND4JIllegalStateException("Can't proceed with handshake from [" + this.id() + "] to [" + this.rootId + "]", (Throwable)e);
            }
        }
    }

    @Override
    public synchronized void launchAsMaster() {
        if (this.mesh.get() == null) {
            this.mesh.set((Serializable)new MeshOrganizer(this.meshBuildMode));
        }
        this.masterMode = true;
        ((MeshOrganizer)this.mesh.get()).getRootNode().setId(this.id());
        this.executorService.submit(new HeartbeatThread(120000L, this, this.mesh));
        this.launch();
    }

    @Override
    public synchronized void shutdown() {
        this.executorService.shutdown();
    }

    protected void propagateArrayMessage(INDArrayMessage message, PropagationMode mode) throws IOException {
        MeshOrganizer.Node node = ((MeshOrganizer)this.mesh.get()).getNodeById(this.id);
        MeshOrganizer.Node root = ((MeshOrganizer)this.mesh.get()).getRootNode();
        MeshOrganizer.Node upstream = node.getUpstreamNode();
        Collection<MeshOrganizer.Node> downstreams = node.getDownstreamNodes();
        Collection<VoidChunk> chunks = this.splitter.split(message, this.voidConfiguration.getMaxChunkSize());
        if (!(node.isRootNode() || PropagationMode.BOTH_WAYS != mode && PropagationMode.ONLY_UP != mode)) {
            chunks.forEach(c -> this.sendMessage((VoidMessage)c, upstream.getId()));
        }
        if (PropagationMode.BOTH_WAYS == mode || PropagationMode.ONLY_DOWN == mode) {
            downstreams.parallelStream().forEach(n -> chunks.forEach(c -> this.sendMessage((VoidMessage)c, n.getId())));
        }
    }

    @Override
    public void propagateMessage(@NonNull VoidMessage voidMessage, PropagationMode mode) throws IOException {
        if (voidMessage == null) {
            throw new NullPointerException("voidMessage is marked @NonNull but is null");
        }
        MeshOrganizer.Node node = ((MeshOrganizer)this.mesh.get()).getNodeById(this.id);
        if (((MeshOrganizer)this.mesh.get()).totalNodes() == 1L) {
            this.internalProcessMessage(voidMessage);
            return;
        }
        MeshOrganizer.Node root = ((MeshOrganizer)this.mesh.get()).getRootNode();
        MeshOrganizer.Node upstream = node.getUpstreamNode();
        Collection<MeshOrganizer.Node> downstreams = node.getDownstreamNodes();
        if (voidMessage instanceof BroadcastableMessage) {
            ((BroadcastableMessage)voidMessage).setRelayId(this.id);
        }
        if (voidMessage instanceof INDArrayMessage) {
            this.propagateArrayMessage((INDArrayMessage)voidMessage, mode);
        } else {
            if (!(node.isRootNode() || PropagationMode.BOTH_WAYS != mode && PropagationMode.ONLY_UP != mode)) {
                this.sendMessage(voidMessage, upstream.getId());
            }
            if (PropagationMode.BOTH_WAYS == mode || PropagationMode.ONLY_DOWN == mode) {
                downstreams.forEach(n -> this.sendMessage(voidMessage, n.getId()));
            }
        }
    }

    protected void propagateBroadcastableMessage(@NonNull BroadcastableMessage voidMessage, PropagationMode mode) {
        if (voidMessage == null) {
            throw new NullPointerException("voidMessage is marked @NonNull but is null");
        }
        if (voidMessage instanceof MeshUpdateMessage) {
            return;
        }
        if (this.historyHolder.storeIfUnknownMessageId(voidMessage.getMessageId())) {
            return;
        }
        MeshOrganizer.Node node = ((MeshOrganizer)this.mesh.get()).getNodeById(this.id);
        if (voidMessage.getOriginatorId() != null && this.id != null && voidMessage.getOriginatorId().equals(this.id)) {
            return;
        }
        MeshOrganizer.Node root = ((MeshOrganizer)this.mesh.get()).getRootNode();
        MeshOrganizer.Node upstream = node.getUpstreamNode();
        Collection<MeshOrganizer.Node> downstreams = node.getDownstreamNodes();
        String ownId = this.id();
        String upstreamId = node.isRootNode() ? null : upstream.getId();
        String originatorId = voidMessage.getOriginatorId();
        String relayId = voidMessage.getRelayId();
        voidMessage.setRelayId(this.id());
        if (!(node.isRootNode() || PropagationMode.BOTH_WAYS != mode && PropagationMode.ONLY_UP != mode || this.isLoopedNode(upstream, originatorId, relayId) || this.isLoopedNode(upstream, originatorId, relayId))) {
            this.sendMessage(voidMessage, upstreamId);
        }
        if (PropagationMode.BOTH_WAYS == mode || PropagationMode.ONLY_DOWN == mode) {
            for (MeshOrganizer.Node n : downstreams) {
                if (this.isLoopedNode(n, originatorId, relayId)) continue;
                this.sendMessage(voidMessage, n.getId());
            }
        }
    }

    protected boolean isLoopedNode(@NonNull MeshOrganizer.Node node, @NonNull String originatorId, @NonNull String relayId) {
        if (node == null) {
            throw new NullPointerException("node is marked @NonNull but is null");
        }
        if (originatorId == null) {
            throw new NullPointerException("originatorId is marked @NonNull but is null");
        }
        if (relayId == null) {
            throw new NullPointerException("relayId is marked @NonNull but is null");
        }
        return node.getId().equals(originatorId) || node.getId().equals(relayId);
    }

    private void forwardToParameterServer(INDArrayMessage message) {
        try {
            this.incomingFlow.accept(message);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void internalProcessMessage(VoidMessage message) {
        Consumer consumer;
        boolean m = message instanceof INDArrayMessage;
        if (message instanceof PingMessage) {
            PongMessage msg = new PongMessage();
            msg.setRequestId(((PingMessage)message).getRequestId());
            this.sendMessage(msg, message.getOriginatorId());
            return;
        }
        if (!(message instanceof PongMessage)) {
            Object reply;
            if (message instanceof VoidChunk) {
                Optional opt = this.splitter.merge((VoidChunk)message, this.voidConfiguration.getChunksBufferSize());
                if (opt.isPresent()) {
                    this.internalProcessMessage((VoidMessage)opt.get());
                }
            } else if (message instanceof INDArrayMessage) {
                if (!(message instanceof ResponseMessage)) {
                    if (!this.historyHolder.isKnownMessageId(message.getMessageId())) {
                        this.forwardToParameterServer((INDArrayMessage)message);
                    }
                } else {
                    reply = (Atomic<MeshOrganizer>)message;
                    this.replies.putIfAbsent(reply.getRequestId(), (ResponseMessage)reply);
                }
            } else {
                Atomic<MeshOrganizer> newMesh;
                HandshakeResponse response;
                if (message instanceof HandshakeRequest) {
                    reply = this.mesh;
                    synchronized (reply) {
                        if (!((MeshOrganizer)this.mesh.get()).isKnownNode(this.id())) {
                            ((MeshOrganizer)this.mesh.get()).getRootNode().setId(this.id);
                        }
                    }
                    response = HandshakeResponse.builder().build();
                    Atomic<MeshOrganizer> atomic = this.mesh;
                    synchronized (atomic) {
                        if (((MeshOrganizer)this.mesh.get()).isKnownNode(message.getOriginatorId())) {
                            log.warn("Got request from known node [{}]. Remapping.", (Object)message.getOriginatorId());
                            this.onRemap(message.getOriginatorId());
                            ((MeshOrganizer)this.mesh.get()).remapNodeAndDownstreams(message.getOriginatorId());
                            response.setRestart(true);
                        } else {
                            ((MeshOrganizer)this.mesh.get()).addNode(message.getOriginatorId());
                            this.numerOfNodes.incrementAndGet();
                        }
                        response.setMesh(((MeshOrganizer)this.mesh.get()).clone());
                    }
                    response.setRequestId(((HandshakeRequest)message).getRequestId());
                    this.sendMessage(response, message.getOriginatorId());
                    try {
                        this.propagateMessageDirect(new MeshUpdateMessage((MeshOrganizer)this.mesh.get()));
                    }
                    catch (Exception e) {
                        log.error("Wasn't able to propagate message from [{}]", (Object)this.id());
                        log.error("MeshUpdateMessage propagation failed:", (Throwable)e);
                        throw new RuntimeException(e);
                    }
                }
                if (message instanceof HandshakeResponse) {
                    response = (HandshakeResponse)message;
                    newMesh = response.getMesh();
                    this.mesh.cas(null, (Serializable)response.getMesh());
                    Atomic<MeshOrganizer> atomic = this.mesh;
                    synchronized (atomic) {
                        long v1 = ((MeshOrganizer)this.mesh.get()).getVersion();
                        long v2 = newMesh.getVersion();
                        if (v1 < v2) {
                            this.mesh.set(newMesh);
                        }
                    }
                    if (response.isRestart()) {
                        log.info("Processing restart response...");
                        if (this.restartCallback != null) {
                            this.restartCallback.call(response);
                        } else {
                            log.warn("Got restart message from master, but there's no defined RestartCallback");
                        }
                    }
                    this.handshakeFlag.set(true);
                    ResponseMessage reply2 = (ResponseMessage)message;
                    this.replies.putIfAbsent(reply2.getRequestId(), reply2);
                } else if (message instanceof ResponseMessage) {
                    reply = (ResponseMessage)message;
                    this.replies.putIfAbsent(reply.getRequestId(), (ResponseMessage)reply);
                } else if (message instanceof MeshUpdateMessage) {
                    MeshOrganizer newMesh2 = ((MeshUpdateMessage)message).getMesh();
                    this.mesh.cas(null, (Serializable)newMesh2);
                    newMesh = this.mesh;
                    synchronized (newMesh) {
                        long v1 = ((MeshOrganizer)this.mesh.get()).getVersion();
                        long v2 = newMesh2.getVersion();
                        if (v1 < v2) {
                            this.mesh.set((Serializable)newMesh2);
                        }
                    }
                    this.onMeshUpdate(newMesh2);
                } else if (message instanceof RequestMessage) {
                    String name = message.getClass().getCanonicalName();
                    Consumer consumer2 = this.consumers.get(name);
                    if (consumer2 == null) {
                        throw new ND4JIllegalStateException("Not supported RequestMessage received: [" + message.getClass().getCanonicalName() + "]");
                    }
                } else {
                    throw new ND4JIllegalStateException("Unknown message received: [" + message.getClass().getCanonicalName() + "]");
                }
            }
        }
        if (message instanceof BroadcastableMessage) {
            try {
                if (this.numerOfNodes.get() > 0) {
                    this.propagateBroadcastableMessage((BroadcastableMessage)message, PropagationMode.BOTH_WAYS);
                } else {
                    log.info("Skipping broadcast due to absence of nodes in mesh");
                }
            }
            catch (Exception e) {
                log.error("Wasn't able to propagate message [{}] from [{}]", (Object)message.getClass().getSimpleName(), (Object)message.getOriginatorId());
                log.error("BroadcastableMessage propagation exception:", (Throwable)e);
                throw new RuntimeException(e);
            }
        }
        if (message instanceof RequestMessage && (consumer = this.consumers.get(message.getClass().getCanonicalName())) != null) {
            try {
                consumer.accept((Object)message);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void propagateMessageDirect(@NonNull BroadcastableMessage message) {
        if (message == null) {
            throw new NullPointerException("message is marked @NonNull but is null");
        }
        Atomic<MeshOrganizer> atomic = this.mesh;
        synchronized (atomic) {
            Collection<MeshOrganizer.Node> nodes = ((MeshOrganizer)this.mesh.get()).flatNodes();
            nodes.stream().forEach(n -> {
                if (!n.isRootNode()) {
                    this.sendMessage(message, n.getId());
                }
            });
        }
    }

    @Override
    public void processMessage(VoidMessage message) {
        try {
            this.messageQueue.transfer(message);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String getRandomDownstreamFrom(@NonNull String id, String exclude) {
        if (id == null) {
            throw new NullPointerException("id is marked @NonNull but is null");
        }
        Collection<MeshOrganizer.Node> nodes = ((MeshOrganizer)this.mesh.get()).getDownstreamsForNode(id);
        if (nodes.isEmpty()) {
            return null;
        }
        ArrayList ids = new ArrayList(nodes.stream().map(node -> node.getId()).collect(Collectors.toList()));
        if (exclude != null) {
            ids.remove(exclude);
        }
        if (ids.size() > 1) {
            Collections.shuffle(ids);
        }
        return (String)ids.get(0);
    }

    @Override
    public <T extends ResponseMessage> T sendMessageBlocking(@NonNull RequestMessage message, @NonNull String id) throws InterruptedException {
        if (message == null) {
            throw new NullPointerException("message is marked @NonNull but is null");
        }
        if (id == null) {
            throw new NullPointerException("id is marked @NonNull but is null");
        }
        if (message.getRequestId() == null) {
            message.setRequestId(UUID.randomUUID().toString());
        }
        this.sendMessage(message, id);
        ResponseMessage r = null;
        while ((r = this.replies.get(message.getRequestId())) == null) {
            Thread.sleep(10L);
        }
        this.replies.remove(message.getRequestId());
        return (T)r;
    }

    @Override
    public <T extends ResponseMessage> T sendMessageBlocking(@NonNull RequestMessage message, @NonNull String id, long timeWait, @NonNull TimeUnit timeUnit) throws InterruptedException {
        long currTime;
        if (message == null) {
            throw new NullPointerException("message is marked @NonNull but is null");
        }
        if (id == null) {
            throw new NullPointerException("id is marked @NonNull but is null");
        }
        if (timeUnit == null) {
            throw new NullPointerException("timeUnit is marked @NonNull but is null");
        }
        if (message.getRequestId() == null) {
            message.setRequestId(UUID.randomUUID().toString());
        }
        this.sendMessage(message, id);
        long sleepMs = TimeUnit.MILLISECONDS.convert(timeWait, timeUnit);
        long startTime = System.currentTimeMillis();
        ResponseMessage r = null;
        while ((r = this.replies.get(message.getRequestId())) == null && (currTime = System.currentTimeMillis()) - startTime <= sleepMs) {
            LockSupport.parkNanos(5000L);
        }
        this.replies.remove(message.getRequestId());
        return (T)r;
    }

    @Override
    public void setRestartCallback(RestartCallback callback) {
        this.restartCallback = callback;
    }

    @Override
    public <T extends RequestMessage> void addRequestConsumer(@NonNull Class<T> cls, Consumer<T> consumer) {
        if (cls == null) {
            throw new NullPointerException("cls is marked @NonNull but is null");
        }
        if (consumer == null) {
            this.consumers.remove(cls.getCanonicalName());
        } else {
            this.consumers.put(cls.getCanonicalName(), consumer);
        }
    }

    @Override
    public void onMeshUpdate(MeshOrganizer mesh) {
        this.numerOfNodes.set((int)mesh.totalNodes());
    }

    @Override
    public void onRemap(String id) {
    }

    @Override
    public String getRootId() {
        return this.rootId;
    }

    @Override
    public int totalNumberOfNodes() {
        return this.numerOfNodes.get();
    }

    @Override
    public boolean isConnected() {
        return true;
    }

    @Override
    public boolean isIntroduced() {
        if (this.masterMode) {
            return true;
        }
        return this.handshakeFlag.get();
    }

    @Override
    public void ensureConnection(String id) {
    }

    protected static class HeartbeatThread
    extends Thread
    implements Runnable {
        protected final long delay;
        protected final Atomic<MeshOrganizer> mesh;
        protected final Transport transport;

        protected HeartbeatThread(long delayMilliseconds, @NonNull Transport transport, @NonNull Atomic<MeshOrganizer> mesh) {
            if (transport == null) {
                throw new NullPointerException("transport is marked @NonNull but is null");
            }
            if (mesh == null) {
                throw new NullPointerException("mesh is marked @NonNull but is null");
            }
            this.delay = delayMilliseconds;
            this.mesh = mesh;
            this.transport = transport;
        }

        @Override
        public void run() {
            block4: while (true) {
                try {
                    while (true) {
                        Thread.sleep(this.delay);
                        AtomicBoolean remapped = new AtomicBoolean(false);
                        Collection<MeshOrganizer.Node> nodes = ((MeshOrganizer)this.mesh.get()).flatNodes();
                        for (MeshOrganizer.Node n : nodes) {
                            PongMessage m;
                            if (this.transport.id().equals(n.getId()) || (m = (PongMessage)this.transport.sendMessageBlocking(new PingMessage(), n.getId(), 100L, TimeUnit.MILLISECONDS)) != null) continue;
                            ((MeshOrganizer)this.mesh.get()).remapNode(n);
                            ((MeshOrganizer)this.mesh.get()).markNodeOffline(n);
                            remapped.set(true);
                        }
                        if (!remapped.get()) continue;
                        try {
                            this.transport.propagateMessage(new MeshUpdateMessage((MeshOrganizer)this.mesh.get()), PropagationMode.ONLY_DOWN);
                            continue block4;
                        }
                        catch (IOException iOException) {
                            continue;
                        }
                        break;
                    }
                }
                catch (InterruptedException interruptedException) {
                    return;
                }
            }
        }
    }

    public static class MessageFlow<T>
    implements Consumer<T>,
    Publisher<T> {
        private List<Subscriber<? super T>> subscribers = new CopyOnWriteArrayList<Subscriber<? super T>>();

        public void accept(T voidMessage) throws Exception {
            this.subscribers.forEach(s -> s.onNext(voidMessage));
        }

        public void subscribe(Subscriber<? super T> subscriber) {
            this.subscribers.add(subscriber);
        }
    }
}

