/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.store.ha;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.ha.HAConnection;
import org.apache.rocketmq.store.ha.WaitNotifyObject;

public class HAService {
    private static final InternalLogger log = InternalLoggerFactory.getLogger((String)"RocketmqStore");
    private final AtomicInteger connectionCount = new AtomicInteger(0);
    private final List<HAConnection> connectionList = new LinkedList<HAConnection>();
    private final AcceptSocketService acceptSocketService;
    private final DefaultMessageStore defaultMessageStore;
    private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
    private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0L);
    private final GroupTransferService groupTransferService;
    private final HAClient haClient;

    public HAService(DefaultMessageStore defaultMessageStore) throws IOException {
        this.defaultMessageStore = defaultMessageStore;
        this.acceptSocketService = new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
        this.groupTransferService = new GroupTransferService();
        this.haClient = new HAClient();
    }

    public void updateMasterAddress(String newAddr) {
        if (this.haClient != null) {
            this.haClient.updateMasterAddress(newAddr);
        }
    }

    public void putRequest(CommitLog.GroupCommitRequest request) {
        this.groupTransferService.putRequest(request);
    }

    public boolean isSlaveOK(long masterPutWhere) {
        boolean result = this.connectionCount.get() > 0;
        result = result && masterPutWhere - this.push2SlaveMaxOffset.get() < (long)this.defaultMessageStore.getMessageStoreConfig().getHaSlaveFallbehindMax();
        return result;
    }

    public void notifyTransferSome(long offset) {
        long value = this.push2SlaveMaxOffset.get();
        while (offset > value) {
            boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
            if (ok) {
                this.groupTransferService.notifyTransferSome();
                break;
            }
            value = this.push2SlaveMaxOffset.get();
        }
    }

    public AtomicInteger getConnectionCount() {
        return this.connectionCount;
    }

    public void start() throws Exception {
        this.acceptSocketService.beginAccept();
        this.acceptSocketService.start();
        this.groupTransferService.start();
        this.haClient.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addConnection(HAConnection conn) {
        List<HAConnection> list = this.connectionList;
        synchronized (list) {
            this.connectionList.add(conn);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeConnection(HAConnection conn) {
        List<HAConnection> list = this.connectionList;
        synchronized (list) {
            this.connectionList.remove(conn);
        }
    }

    public void shutdown() {
        this.haClient.shutdown();
        this.acceptSocketService.shutdown(true);
        this.destroyConnections();
        this.groupTransferService.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroyConnections() {
        List<HAConnection> list = this.connectionList;
        synchronized (list) {
            for (HAConnection c : this.connectionList) {
                c.shutdown();
            }
            this.connectionList.clear();
        }
    }

    public DefaultMessageStore getDefaultMessageStore() {
        return this.defaultMessageStore;
    }

    public WaitNotifyObject getWaitNotifyObject() {
        return this.waitNotifyObject;
    }

    public AtomicLong getPush2SlaveMaxOffset() {
        return this.push2SlaveMaxOffset;
    }

    class HAClient
    extends ServiceThread {
        private static final int READ_MAX_BUFFER_SIZE = 0x400000;
        private final AtomicReference<String> masterAddress = new AtomicReference();
        private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
        private SocketChannel socketChannel;
        private Selector selector;
        private long lastWriteTimestamp = System.currentTimeMillis();
        private long currentReportedOffset = 0L;
        private int dispatchPosition = 0;
        private ByteBuffer byteBufferRead = ByteBuffer.allocate(0x400000);
        private ByteBuffer byteBufferBackup = ByteBuffer.allocate(0x400000);

        public HAClient() throws IOException {
            this.selector = RemotingUtil.openSelector();
        }

        public void updateMasterAddress(String newAddr) {
            String currentAddr = this.masterAddress.get();
            if (currentAddr == null || !currentAddr.equals(newAddr)) {
                this.masterAddress.set(newAddr);
                log.info("update master address, OLD: " + currentAddr + " NEW: " + newAddr);
            }
        }

        private boolean isTimeToReportOffset() {
            long interval = HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
            boolean needHeart = interval > (long)HAService.this.defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
            return needHeart;
        }

        private boolean reportSlaveMaxOffset(long maxOffset) {
            this.reportOffset.position(0);
            this.reportOffset.limit(8);
            this.reportOffset.putLong(maxOffset);
            this.reportOffset.position(0);
            this.reportOffset.limit(8);
            for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); ++i) {
                try {
                    this.socketChannel.write(this.reportOffset);
                    continue;
                }
                catch (IOException e) {
                    log.error(this.getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception", (Throwable)e);
                    return false;
                }
            }
            this.lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
            return !this.reportOffset.hasRemaining();
        }

        private void reallocateByteBuffer() {
            int remain = 0x400000 - this.dispatchPosition;
            if (remain > 0) {
                this.byteBufferRead.position(this.dispatchPosition);
                this.byteBufferBackup.position(0);
                this.byteBufferBackup.limit(0x400000);
                this.byteBufferBackup.put(this.byteBufferRead);
            }
            this.swapByteBuffer();
            this.byteBufferRead.position(remain);
            this.byteBufferRead.limit(0x400000);
            this.dispatchPosition = 0;
        }

        private void swapByteBuffer() {
            ByteBuffer tmp = this.byteBufferRead;
            this.byteBufferRead = this.byteBufferBackup;
            this.byteBufferBackup = tmp;
        }

        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        boolean result = this.dispatchReadRequest();
                        if (result) continue;
                        log.error("HAClient, dispatchReadRequest error");
                        return false;
                    }
                    if (readSize == 0) {
                        if (++readSizeZeroTimes < 3) continue;
                        break;
                    }
                    log.info("HAClient, processReadEvent read socket < 0");
                    return false;
                }
                catch (IOException e) {
                    log.info("HAClient, processReadEvent read socket exception", (Throwable)e);
                    return false;
                }
            }
            return true;
        }

        private boolean dispatchReadRequest() {
            block2: {
                int diff;
                int msgHeaderSize = 12;
                int readSocketPos = this.byteBufferRead.position();
                while ((diff = this.byteBufferRead.position() - this.dispatchPosition) >= 12) {
                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                    int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                    if (slavePhyOffset != 0L && slavePhyOffset != masterPhyOffset) {
                        log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset);
                        return false;
                    }
                    if (diff < 12 + bodySize) break;
                    byte[] bodyData = new byte[bodySize];
                    this.byteBufferRead.position(this.dispatchPosition + 12);
                    this.byteBufferRead.get(bodyData);
                    HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
                    this.byteBufferRead.position(readSocketPos);
                    this.dispatchPosition += 12 + bodySize;
                    if (this.reportSlaveMaxOffsetPlus()) continue;
                    return false;
                }
                if (this.byteBufferRead.hasRemaining()) break block2;
                this.reallocateByteBuffer();
            }
            return true;
        }

        private boolean reportSlaveMaxOffsetPlus() {
            boolean result = true;
            long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
            if (currentPhyOffset > this.currentReportedOffset) {
                this.currentReportedOffset = currentPhyOffset;
                result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                if (!result) {
                    this.closeMaster();
                    log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
                }
            }
            return result;
        }

        private boolean connectMaster() throws ClosedChannelException {
            if (null == this.socketChannel) {
                SocketAddress socketAddress;
                String addr = this.masterAddress.get();
                if (addr != null && (socketAddress = RemotingUtil.string2SocketAddress((String)addr)) != null) {
                    this.socketChannel = RemotingUtil.connect((SocketAddress)socketAddress);
                    if (this.socketChannel != null) {
                        this.socketChannel.register(this.selector, 1);
                    }
                }
                this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                this.lastWriteTimestamp = System.currentTimeMillis();
            }
            return this.socketChannel != null;
        }

        private void closeMaster() {
            if (null != this.socketChannel) {
                try {
                    SelectionKey sk = this.socketChannel.keyFor(this.selector);
                    if (sk != null) {
                        sk.cancel();
                    }
                    this.socketChannel.close();
                    this.socketChannel = null;
                }
                catch (IOException e) {
                    log.warn("closeMaster exception. ", (Throwable)e);
                }
                this.lastWriteTimestamp = 0L;
                this.dispatchPosition = 0;
                this.byteBufferBackup.position(0);
                this.byteBufferBackup.limit(0x400000);
                this.byteBufferRead.position(0);
                this.byteBufferRead.limit(0x400000);
            }
        }

        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    if (this.connectMaster()) {
                        long interval;
                        boolean result;
                        if (this.isTimeToReportOffset() && !(result = this.reportSlaveMaxOffset(this.currentReportedOffset))) {
                            this.closeMaster();
                        }
                        this.selector.select(1000L);
                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            this.closeMaster();
                        }
                        if (!this.reportSlaveMaxOffsetPlus() || (interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp) <= (long)HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) continue;
                        log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval);
                        this.closeMaster();
                        log.warn("HAClient, master not response some time, so close connection");
                        continue;
                    }
                    this.waitForRunning(5000L);
                }
                catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
                    this.waitForRunning(5000L);
                }
            }
            log.info(this.getServiceName() + " service end");
        }

        public void shutdown() {
            super.shutdown();
            this.closeMaster();
        }

        public String getServiceName() {
            return HAClient.class.getSimpleName();
        }
    }

    class GroupTransferService
    extends ServiceThread {
        private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
        private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<CommitLog.GroupCommitRequest>();
        private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<CommitLog.GroupCommitRequest>();

        GroupTransferService() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public synchronized void putRequest(CommitLog.GroupCommitRequest request) {
            List<CommitLog.GroupCommitRequest> list = this.requestsWrite;
            synchronized (list) {
                this.requestsWrite.add(request);
            }
            this.wakeup();
        }

        public void notifyTransferSome() {
            this.notifyTransferObject.wakeup();
        }

        private void swapRequests() {
            List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doWaitTransfer() {
            List<CommitLog.GroupCommitRequest> list = this.requestsRead;
            synchronized (list) {
                if (!this.requestsRead.isEmpty()) {
                    for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                        boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now() + (long)HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
                        while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
                            this.notifyTransferObject.waitForRunning(1000L);
                            transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        }
                        if (!transferOK) {
                            log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                        }
                        req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                    this.requestsRead.clear();
                }
            }
        }

        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    this.waitForRunning(10L);
                    this.doWaitTransfer();
                }
                catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
                }
            }
            log.info(this.getServiceName() + " service end");
        }

        protected void onWaitEnd() {
            this.swapRequests();
        }

        public String getServiceName() {
            return GroupTransferService.class.getSimpleName();
        }
    }

    class AcceptSocketService
    extends ServiceThread {
        private final SocketAddress socketAddressListen;
        private ServerSocketChannel serverSocketChannel;
        private Selector selector;

        public AcceptSocketService(int port) {
            this.socketAddressListen = new InetSocketAddress(port);
        }

        public void beginAccept() throws Exception {
            this.serverSocketChannel = ServerSocketChannel.open();
            this.selector = RemotingUtil.openSelector();
            this.serverSocketChannel.socket().setReuseAddress(true);
            this.serverSocketChannel.socket().bind(this.socketAddressListen);
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocketChannel.register(this.selector, 16);
        }

        public void shutdown(boolean interrupt) {
            super.shutdown(interrupt);
            try {
                this.serverSocketChannel.close();
                this.selector.close();
            }
            catch (IOException e) {
                log.error("AcceptSocketService shutdown exception", (Throwable)e);
            }
        }

        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    this.selector.select(1000L);
                    Set<SelectionKey> selected = this.selector.selectedKeys();
                    if (selected == null) continue;
                    for (SelectionKey k : selected) {
                        if ((k.readyOps() & 0x10) != 0) {
                            SocketChannel sc = ((ServerSocketChannel)k.channel()).accept();
                            if (sc == null) continue;
                            log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress());
                            try {
                                HAConnection conn = new HAConnection(HAService.this, sc);
                                conn.start();
                                HAService.this.addConnection(conn);
                            }
                            catch (Exception e) {
                                log.error("new HAConnection exception", (Throwable)e);
                                sc.close();
                            }
                            continue;
                        }
                        log.warn("Unexpected ops in select " + k.readyOps());
                    }
                    selected.clear();
                }
                catch (Exception e) {
                    log.error(this.getServiceName() + " service has exception.", (Throwable)e);
                }
            }
            log.info(this.getServiceName() + " service end");
        }

        public String getServiceName() {
            return AcceptSocketService.class.getSimpleName();
        }
    }
}

