/*
 * Decompiled with CFR 0.152.
 */
package io.openmessaging.storage.dledger;

import com.alibaba.fastjson.JSON;
import io.openmessaging.storage.dledger.AppendFuture;
import io.openmessaging.storage.dledger.BatchAppendFuture;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerRpcService;
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.ShutdownAbleThread;
import io.openmessaging.storage.dledger.TimeoutFuture;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
import io.openmessaging.storage.dledger.store.DLedgerMemoryStore;
import io.openmessaging.storage.dledger.store.DLedgerStore;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import io.openmessaging.storage.dledger.utils.Pair;
import io.openmessaging.storage.dledger.utils.PreConditions;
import io.openmessaging.storage.dledger.utils.Quota;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DLedgerEntryPusher {
    private static Logger logger = LoggerFactory.getLogger(DLedgerEntryPusher.class);
    private DLedgerConfig dLedgerConfig;
    private DLedgerStore dLedgerStore;
    private final MemberState memberState;
    private DLedgerRpcService dLedgerRpcService;
    private Map<Long, ConcurrentMap<String, Long>> peerWaterMarksByTerm = new ConcurrentHashMap<Long, ConcurrentMap<String, Long>>();
    private Map<Long, ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>>> pendingAppendResponsesByTerm = new ConcurrentHashMap<Long, ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>>>();
    private EntryHandler entryHandler;
    private QuorumAckChecker quorumAckChecker;
    private Map<String, EntryDispatcher> dispatcherMap = new HashMap<String, EntryDispatcher>();

    public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore, DLedgerRpcService dLedgerRpcService) {
        this.dLedgerConfig = dLedgerConfig;
        this.memberState = memberState;
        this.dLedgerStore = dLedgerStore;
        this.dLedgerRpcService = dLedgerRpcService;
        for (String peer : memberState.getPeerMap().keySet()) {
            if (peer.equals(memberState.getSelfId())) continue;
            this.dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
        }
        this.entryHandler = new EntryHandler(logger);
        this.quorumAckChecker = new QuorumAckChecker(logger);
    }

    public void startup() {
        this.entryHandler.start();
        this.quorumAckChecker.start();
        for (EntryDispatcher dispatcher : this.dispatcherMap.values()) {
            dispatcher.start();
        }
    }

    public void shutdown() {
        this.entryHandler.shutdown();
        this.quorumAckChecker.shutdown();
        for (EntryDispatcher dispatcher : this.dispatcherMap.values()) {
            dispatcher.shutdown();
        }
    }

    public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
        return this.entryHandler.handlePush(request);
    }

    private void checkTermForWaterMark(long term, String env) {
        if (!this.peerWaterMarksByTerm.containsKey(term)) {
            logger.info("Initialize the watermark in {} for term={}", (Object)env, (Object)term);
            ConcurrentHashMap<String, Long> waterMarks = new ConcurrentHashMap<String, Long>();
            for (String peer : this.memberState.getPeerMap().keySet()) {
                waterMarks.put(peer, -1L);
            }
            this.peerWaterMarksByTerm.putIfAbsent(term, waterMarks);
        }
    }

    private void checkTermForPendingMap(long term, String env) {
        if (!this.pendingAppendResponsesByTerm.containsKey(term)) {
            logger.info("Initialize the pending append map in {} for term={}", (Object)env, (Object)term);
            this.pendingAppendResponsesByTerm.putIfAbsent(term, new ConcurrentHashMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updatePeerWaterMark(long term, String peerId, long index) {
        Map<Long, ConcurrentMap<String, Long>> map = this.peerWaterMarksByTerm;
        synchronized (map) {
            this.checkTermForWaterMark(term, "updatePeerWaterMark");
            if ((Long)this.peerWaterMarksByTerm.get(term).get(peerId) < index) {
                this.peerWaterMarksByTerm.get(term).put(peerId, index);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long getPeerWaterMark(long term, String peerId) {
        Map<Long, ConcurrentMap<String, Long>> map = this.peerWaterMarksByTerm;
        synchronized (map) {
            this.checkTermForWaterMark(term, "getPeerWaterMark");
            return (Long)this.peerWaterMarksByTerm.get(term).get(peerId);
        }
    }

    public boolean isPendingFull(long currTerm) {
        this.checkTermForPendingMap(currTerm, "isPendingFull");
        return this.pendingAppendResponsesByTerm.get(currTerm).size() > this.dLedgerConfig.getMaxPendingRequestsNum();
    }

    public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry entry, boolean isBatchWait) {
        this.updatePeerWaterMark(entry.getTerm(), this.memberState.getSelfId(), entry.getIndex());
        if (this.memberState.getPeerMap().size() == 1) {
            AppendEntryResponse response = new AppendEntryResponse();
            response.setGroup(this.memberState.getGroup());
            response.setLeaderId(this.memberState.getSelfId());
            response.setIndex(entry.getIndex());
            response.setTerm(entry.getTerm());
            response.setPos(entry.getPos());
            if (isBatchWait) {
                return BatchAppendFuture.newCompletedFuture(entry.getPos(), response);
            }
            return AppendFuture.newCompletedFuture(entry.getPos(), response);
        }
        this.checkTermForPendingMap(entry.getTerm(), "waitAck");
        AppendFuture future = isBatchWait ? new BatchAppendFuture(this.dLedgerConfig.getMaxWaitAckTimeMs()) : new AppendFuture(this.dLedgerConfig.getMaxWaitAckTimeMs());
        future.setPos(entry.getPos());
        CompletableFuture old = this.pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);
        if (old != null) {
            logger.warn("[MONITOR] get old wait at index={}", (Object)entry.getIndex());
        }
        return future;
    }

    public void wakeUpDispatchers() {
        for (EntryDispatcher dispatcher : this.dispatcherMap.values()) {
            dispatcher.wakeup();
        }
    }

    private class EntryHandler
    extends ShutdownAbleThread {
        private long lastCheckFastForwardTimeMs;
        ConcurrentMap<Long, Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> writeRequestMap;
        BlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> compareOrTruncateRequests;

        public EntryHandler(Logger logger) {
            super("EntryHandler-" + DLedgerEntryPusher.this.memberState.getSelfId(), logger);
            this.lastCheckFastForwardTimeMs = System.currentTimeMillis();
            this.writeRequestMap = new ConcurrentHashMap<Long, Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>>();
            this.compareOrTruncateRequests = new ArrayBlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>>(100);
        }

        public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
            TimeoutFuture<PushEntryResponse> future = new TimeoutFuture<PushEntryResponse>(1000L);
            switch (request.getType()) {
                case APPEND: {
                    if (request.isBatch()) {
                        PreConditions.check(request.getBatchEntry() != null && request.getCount() > 0, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
                    } else {
                        PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
                    }
                    long index = request.getFirstEntryIndex();
                    Pair<PushEntryRequest, TimeoutFuture<PushEntryResponse>> old = this.writeRequestMap.putIfAbsent(index, new Pair<PushEntryRequest, TimeoutFuture<PushEntryResponse>>(request, future));
                    if (old == null) break;
                    this.logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", new Object[]{index, old.getKey().baseInfo(), request.baseInfo()});
                    future.complete(this.buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));
                    break;
                }
                case COMMIT: {
                    this.compareOrTruncateRequests.put(new Pair(request, future));
                    break;
                }
                case COMPARE: 
                case TRUNCATE: {
                    PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
                    this.writeRequestMap.clear();
                    this.compareOrTruncateRequests.put(new Pair<PushEntryRequest, TimeoutFuture<PushEntryResponse>>(request, future));
                    break;
                }
                default: {
                    this.logger.error("[BUG]Unknown type {} from {}", (Object)request.getType(), (Object)request.baseInfo());
                    future.complete(this.buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
                }
            }
            this.wakeup();
            return future;
        }

        private PushEntryResponse buildResponse(PushEntryRequest request, int code) {
            PushEntryResponse response = new PushEntryResponse();
            response.setGroup(request.getGroup());
            response.setCode(code);
            response.setTerm(request.getTerm());
            if (request.getType() != PushEntryRequest.Type.COMMIT) {
                response.setIndex(request.getEntry().getIndex());
            }
            response.setBeginIndex(DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex());
            response.setEndIndex(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex());
            return response;
        }

        private PushEntryResponse buildBatchAppendResponse(PushEntryRequest request, int code) {
            PushEntryResponse response = new PushEntryResponse();
            response.setGroup(request.getGroup());
            response.setCode(code);
            response.setTerm(request.getTerm());
            response.setIndex(request.getLastEntryIndex());
            response.setBeginIndex(DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex());
            response.setEndIndex(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex());
            return response;
        }

        private void handleDoAppend(long writeIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) {
            try {
                PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
                DLedgerEntry entry = DLedgerEntryPusher.this.dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());
                PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);
                future.complete(this.buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
                DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
            }
            catch (Throwable t) {
                this.logger.error("[HandleDoWrite] writeIndex={}", (Object)writeIndex, (Object)t);
                future.complete(this.buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
            }
        }

        private CompletableFuture<PushEntryResponse> handleDoCompare(long compareIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) {
            try {
                PreConditions.check(compareIndex == request.getEntry().getIndex(), DLedgerResponseCode.UNKNOWN);
                PreConditions.check(request.getType() == PushEntryRequest.Type.COMPARE, DLedgerResponseCode.UNKNOWN);
                DLedgerEntry local = DLedgerEntryPusher.this.dLedgerStore.get(compareIndex);
                PreConditions.check(request.getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);
                future.complete(this.buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
            }
            catch (Throwable t) {
                this.logger.error("[HandleDoCompare] compareIndex={}", (Object)compareIndex, (Object)t);
                future.complete(this.buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
            }
            return future;
        }

        private CompletableFuture<PushEntryResponse> handleDoCommit(long committedIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) {
            try {
                PreConditions.check(committedIndex == request.getCommitIndex(), DLedgerResponseCode.UNKNOWN);
                PreConditions.check(request.getType() == PushEntryRequest.Type.COMMIT, DLedgerResponseCode.UNKNOWN);
                DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(request.getTerm(), committedIndex);
                future.complete(this.buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
            }
            catch (Throwable t) {
                this.logger.error("[HandleDoCommit] committedIndex={}", (Object)request.getCommitIndex(), (Object)t);
                future.complete(this.buildResponse(request, DLedgerResponseCode.UNKNOWN.getCode()));
            }
            return future;
        }

        private CompletableFuture<PushEntryResponse> handleDoTruncate(long truncateIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) {
            try {
                this.logger.info("[HandleDoTruncate] truncateIndex={} pos={}", (Object)truncateIndex, (Object)request.getEntry().getPos());
                PreConditions.check(truncateIndex == request.getEntry().getIndex(), DLedgerResponseCode.UNKNOWN);
                PreConditions.check(request.getType() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
                long index = DLedgerEntryPusher.this.dLedgerStore.truncate(request.getEntry(), request.getTerm(), request.getLeaderId());
                PreConditions.check(index == truncateIndex, DLedgerResponseCode.INCONSISTENT_STATE);
                future.complete(this.buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
                DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
            }
            catch (Throwable t) {
                this.logger.error("[HandleDoTruncate] truncateIndex={}", (Object)truncateIndex, (Object)t);
                future.complete(this.buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
            }
            return future;
        }

        private void handleDoBatchAppend(long writeIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) {
            try {
                PreConditions.check(writeIndex == request.getFirstEntryIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
                for (DLedgerEntry entry : request.getBatchEntry()) {
                    DLedgerEntryPusher.this.dLedgerStore.appendAsFollower(entry, request.getTerm(), request.getLeaderId());
                }
                future.complete(this.buildBatchAppendResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
                DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
            }
            catch (Throwable t) {
                this.logger.error("[HandleDoBatchAppend]", t);
            }
        }

        private void checkAppendFuture(long endIndex) {
            long minFastForwardIndex = Long.MAX_VALUE;
            for (Pair pair : this.writeRequestMap.values()) {
                long firstEntryIndex = ((PushEntryRequest)pair.getKey()).getFirstEntryIndex();
                long lastEntryIndex = ((PushEntryRequest)pair.getKey()).getLastEntryIndex();
                if (lastEntryIndex <= endIndex) {
                    try {
                        if (((PushEntryRequest)pair.getKey()).isBatch()) {
                            for (DLedgerEntry dLedgerEntry : ((PushEntryRequest)pair.getKey()).getBatchEntry()) {
                                PreConditions.check(dLedgerEntry.equals(DLedgerEntryPusher.this.dLedgerStore.get(dLedgerEntry.getIndex())), DLedgerResponseCode.INCONSISTENT_STATE);
                            }
                        } else {
                            DLedgerEntry dLedgerEntry = ((PushEntryRequest)pair.getKey()).getEntry();
                            PreConditions.check(dLedgerEntry.equals(DLedgerEntryPusher.this.dLedgerStore.get(dLedgerEntry.getIndex())), DLedgerResponseCode.INCONSISTENT_STATE);
                        }
                        ((CompletableFuture)pair.getValue()).complete(this.buildBatchAppendResponse((PushEntryRequest)pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
                        this.logger.warn("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", (Object)lastEntryIndex, (Object)endIndex);
                    }
                    catch (Throwable t) {
                        this.logger.error("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", new Object[]{lastEntryIndex, endIndex, t});
                        ((CompletableFuture)pair.getValue()).complete(this.buildBatchAppendResponse((PushEntryRequest)pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
                    }
                    this.writeRequestMap.remove(((PushEntryRequest)pair.getKey()).getFirstEntryIndex());
                    continue;
                }
                if (firstEntryIndex == endIndex + 1L) {
                    return;
                }
                TimeoutFuture future = (TimeoutFuture)pair.getValue();
                if (!future.isTimeOut() || firstEntryIndex >= minFastForwardIndex) continue;
                minFastForwardIndex = firstEntryIndex;
            }
            if (minFastForwardIndex == Long.MAX_VALUE) {
                return;
            }
            Pair pair = (Pair)this.writeRequestMap.get(minFastForwardIndex);
            if (pair == null) {
                return;
            }
            this.logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", (Object)endIndex, (Object)minFastForwardIndex);
            ((CompletableFuture)pair.getValue()).complete(this.buildBatchAppendResponse((PushEntryRequest)pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
        }

        private void checkAbnormalFuture(long endIndex) {
            if (DLedgerUtils.elapsed(this.lastCheckFastForwardTimeMs) < 1000L) {
                return;
            }
            this.lastCheckFastForwardTimeMs = System.currentTimeMillis();
            if (this.writeRequestMap.isEmpty()) {
                return;
            }
            this.checkAppendFuture(endIndex);
        }

        @Override
        public void doWork() {
            try {
                if (!DLedgerEntryPusher.this.memberState.isFollower()) {
                    this.waitForRunning(1L);
                    return;
                }
                if (this.compareOrTruncateRequests.peek() != null) {
                    Pair pair = (Pair)this.compareOrTruncateRequests.poll();
                    PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN);
                    switch (((PushEntryRequest)pair.getKey()).getType()) {
                        case TRUNCATE: {
                            this.handleDoTruncate(((PushEntryRequest)pair.getKey()).getEntry().getIndex(), (PushEntryRequest)pair.getKey(), (CompletableFuture)pair.getValue());
                            break;
                        }
                        case COMPARE: {
                            this.handleDoCompare(((PushEntryRequest)pair.getKey()).getEntry().getIndex(), (PushEntryRequest)pair.getKey(), (CompletableFuture)pair.getValue());
                            break;
                        }
                        case COMMIT: {
                            this.handleDoCommit(((PushEntryRequest)pair.getKey()).getCommitIndex(), (PushEntryRequest)pair.getKey(), (CompletableFuture)pair.getValue());
                            break;
                        }
                    }
                } else {
                    long nextIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() + 1L;
                    Pair pair = (Pair)this.writeRequestMap.remove(nextIndex);
                    if (pair == null) {
                        this.checkAbnormalFuture(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex());
                        this.waitForRunning(1L);
                        return;
                    }
                    PushEntryRequest request = (PushEntryRequest)pair.getKey();
                    if (request.isBatch()) {
                        this.handleDoBatchAppend(nextIndex, request, (CompletableFuture)pair.getValue());
                    } else {
                        this.handleDoAppend(nextIndex, request, (CompletableFuture)pair.getValue());
                    }
                }
            }
            catch (Throwable t) {
                logger.error("Error in {}", (Object)this.getName(), (Object)t);
                DLedgerUtils.sleep(100L);
            }
        }
    }

    private class EntryDispatcher
    extends ShutdownAbleThread {
        private AtomicReference<PushEntryRequest.Type> type;
        private long lastPushCommitTimeMs;
        private String peerId;
        private long compareIndex;
        private long writeIndex;
        private int maxPendingSize;
        private long term;
        private String leaderId;
        private long lastCheckLeakTimeMs;
        private ConcurrentMap<Long, Long> pendingMap;
        private ConcurrentMap<Long, Pair<Long, Integer>> batchPendingMap;
        private PushEntryRequest batchAppendEntryRequest;
        private Quota quota;

        public EntryDispatcher(String peerId, Logger logger) {
            super("EntryDispatcher-" + DLedgerEntryPusher.this.memberState.getSelfId() + "-" + peerId, logger);
            this.type = new AtomicReference<PushEntryRequest.Type>(PushEntryRequest.Type.COMPARE);
            this.lastPushCommitTimeMs = -1L;
            this.compareIndex = -1L;
            this.writeIndex = -1L;
            this.maxPendingSize = 1000;
            this.term = -1L;
            this.leaderId = null;
            this.lastCheckLeakTimeMs = System.currentTimeMillis();
            this.pendingMap = new ConcurrentHashMap<Long, Long>();
            this.batchPendingMap = new ConcurrentHashMap<Long, Pair<Long, Integer>>();
            this.batchAppendEntryRequest = new PushEntryRequest();
            this.quota = new Quota(DLedgerEntryPusher.this.dLedgerConfig.getPeerPushQuota());
            this.peerId = peerId;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean checkAndFreshState() {
            if (!DLedgerEntryPusher.this.memberState.isLeader()) {
                return false;
            }
            if (this.term != DLedgerEntryPusher.this.memberState.currTerm() || this.leaderId == null || !this.leaderId.equals(DLedgerEntryPusher.this.memberState.getLeaderId())) {
                MemberState memberState = DLedgerEntryPusher.this.memberState;
                synchronized (memberState) {
                    if (!DLedgerEntryPusher.this.memberState.isLeader()) {
                        return false;
                    }
                    PreConditions.check(DLedgerEntryPusher.this.memberState.getSelfId().equals(DLedgerEntryPusher.this.memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
                    this.term = DLedgerEntryPusher.this.memberState.currTerm();
                    this.leaderId = DLedgerEntryPusher.this.memberState.getSelfId();
                    this.changeState(-1L, PushEntryRequest.Type.COMPARE);
                }
            }
            return true;
        }

        private PushEntryRequest buildPushRequest(DLedgerEntry entry, PushEntryRequest.Type target) {
            PushEntryRequest request = new PushEntryRequest();
            request.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
            request.setRemoteId(this.peerId);
            request.setLeaderId(this.leaderId);
            request.setTerm(this.term);
            request.setEntry(entry);
            request.setType(target);
            request.setCommitIndex(DLedgerEntryPusher.this.dLedgerStore.getCommittedIndex());
            return request;
        }

        private void resetBatchAppendEntryRequest() {
            this.batchAppendEntryRequest.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
            this.batchAppendEntryRequest.setRemoteId(this.peerId);
            this.batchAppendEntryRequest.setLeaderId(this.leaderId);
            this.batchAppendEntryRequest.setTerm(this.term);
            this.batchAppendEntryRequest.setType(PushEntryRequest.Type.APPEND);
            this.batchAppendEntryRequest.clear();
        }

        private void checkQuotaAndWait(DLedgerEntry entry) {
            if (DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() - entry.getIndex() <= (long)this.maxPendingSize) {
                return;
            }
            if (DLedgerEntryPusher.this.dLedgerStore instanceof DLedgerMemoryStore) {
                return;
            }
            DLedgerMmapFileStore mmapFileStore = (DLedgerMmapFileStore)DLedgerEntryPusher.this.dLedgerStore;
            if (mmapFileStore.getDataFileList().getMaxWrotePosition() - entry.getPos() < (long)DLedgerEntryPusher.this.dLedgerConfig.getPeerPushThrottlePoint()) {
                return;
            }
            this.quota.sample(entry.getSize());
            if (this.quota.validateNow()) {
                long leftNow = this.quota.leftNow();
                this.logger.warn("[Push-{}]Quota exhaust, will sleep {}ms", (Object)this.peerId, (Object)leftNow);
                DLedgerUtils.sleep(leftNow);
            }
        }

        private void doAppendInner(long index) throws Exception {
            DLedgerEntry entry = this.getDLedgerEntryForAppend(index);
            if (null == entry) {
                return;
            }
            this.checkQuotaAndWait(entry);
            PushEntryRequest request = this.buildPushRequest(entry, PushEntryRequest.Type.APPEND);
            CompletableFuture<PushEntryResponse> responseFuture = DLedgerEntryPusher.this.dLedgerRpcService.push(request);
            this.pendingMap.put(index, System.currentTimeMillis());
            responseFuture.whenComplete((x, ex) -> {
                try {
                    PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
                    DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
                    switch (responseCode) {
                        case SUCCESS: {
                            this.pendingMap.remove(x.getIndex());
                            DLedgerEntryPusher.this.updatePeerWaterMark(x.getTerm(), this.peerId, x.getIndex());
                            DLedgerEntryPusher.this.quorumAckChecker.wakeup();
                            break;
                        }
                        case INCONSISTENT_STATE: {
                            this.logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", new Object[]{this.peerId, x.getIndex(), x.getTerm()});
                            this.changeState(-1L, PushEntryRequest.Type.COMPARE);
                            break;
                        }
                        default: {
                            this.logger.warn("[Push-{}]Get error response code {} {}", new Object[]{this.peerId, responseCode, x.baseInfo()});
                            break;
                        }
                    }
                }
                catch (Throwable t) {
                    this.logger.error("", t);
                }
            });
            this.lastPushCommitTimeMs = System.currentTimeMillis();
        }

        private DLedgerEntry getDLedgerEntryForAppend(long index) {
            DLedgerEntry entry;
            try {
                entry = DLedgerEntryPusher.this.dLedgerStore.get(index);
            }
            catch (DLedgerException e) {
                if (DLedgerResponseCode.INDEX_LESS_THAN_LOCAL_BEGIN.equals((Object)e.getCode())) {
                    this.logger.info("[Push-{}]Get INDEX_LESS_THAN_LOCAL_BEGIN when requested index is {}, try to compare", (Object)this.peerId, (Object)index);
                    this.changeState(-1L, PushEntryRequest.Type.COMPARE);
                    return null;
                }
                throw e;
            }
            PreConditions.check(entry != null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", index);
            return entry;
        }

        private void doCommit() throws Exception {
            if (DLedgerUtils.elapsed(this.lastPushCommitTimeMs) > 1000L) {
                PushEntryRequest request = this.buildPushRequest(null, PushEntryRequest.Type.COMMIT);
                DLedgerEntryPusher.this.dLedgerRpcService.push(request);
                this.lastPushCommitTimeMs = System.currentTimeMillis();
            }
        }

        private void doCheckAppendResponse() throws Exception {
            long peerWaterMark = DLedgerEntryPusher.this.getPeerWaterMark(this.term, this.peerId);
            Long sendTimeMs = (Long)this.pendingMap.get(peerWaterMark + 1L);
            if (sendTimeMs != null && System.currentTimeMillis() - sendTimeMs > (long)DLedgerEntryPusher.this.dLedgerConfig.getMaxPushTimeOutMs()) {
                this.logger.warn("[Push-{}]Retry to push entry at {}", (Object)this.peerId, (Object)(peerWaterMark + 1L));
                this.doAppendInner(peerWaterMark + 1L);
            }
        }

        private void doAppend() throws Exception {
            while (this.checkAndFreshState() && this.type.get() == PushEntryRequest.Type.APPEND) {
                if (this.writeIndex > DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex()) {
                    this.doCommit();
                    this.doCheckAppendResponse();
                    break;
                }
                if (this.pendingMap.size() >= this.maxPendingSize || DLedgerUtils.elapsed(this.lastCheckLeakTimeMs) > 1000L) {
                    long peerWaterMark = DLedgerEntryPusher.this.getPeerWaterMark(this.term, this.peerId);
                    for (Long index : this.pendingMap.keySet()) {
                        if (index >= peerWaterMark) continue;
                        this.pendingMap.remove(index);
                    }
                    this.lastCheckLeakTimeMs = System.currentTimeMillis();
                }
                if (this.pendingMap.size() >= this.maxPendingSize) {
                    this.doCheckAppendResponse();
                    break;
                }
                this.doAppendInner(this.writeIndex);
                ++this.writeIndex;
            }
        }

        private void sendBatchAppendEntryRequest() throws Exception {
            this.batchAppendEntryRequest.setCommitIndex(DLedgerEntryPusher.this.dLedgerStore.getCommittedIndex());
            CompletableFuture<PushEntryResponse> responseFuture = DLedgerEntryPusher.this.dLedgerRpcService.push(this.batchAppendEntryRequest);
            this.batchPendingMap.put(this.batchAppendEntryRequest.getFirstEntryIndex(), new Pair<Long, Integer>(System.currentTimeMillis(), this.batchAppendEntryRequest.getCount()));
            responseFuture.whenComplete((x, ex) -> {
                try {
                    PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
                    DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
                    switch (responseCode) {
                        case SUCCESS: {
                            this.batchPendingMap.remove(x.getIndex());
                            DLedgerEntryPusher.this.updatePeerWaterMark(x.getTerm(), this.peerId, x.getIndex());
                            break;
                        }
                        case INCONSISTENT_STATE: {
                            this.logger.info("[Push-{}]Get INCONSISTENT_STATE when batch push index={} term={}", new Object[]{this.peerId, x.getIndex(), x.getTerm()});
                            this.changeState(-1L, PushEntryRequest.Type.COMPARE);
                            break;
                        }
                        default: {
                            this.logger.warn("[Push-{}]Get error response code {} {}", new Object[]{this.peerId, responseCode, x.baseInfo()});
                            break;
                        }
                    }
                }
                catch (Throwable t) {
                    this.logger.error("", t);
                }
            });
            this.lastPushCommitTimeMs = System.currentTimeMillis();
            this.batchAppendEntryRequest.clear();
        }

        private void doBatchAppendInner(long index) throws Exception {
            DLedgerEntry entry = this.getDLedgerEntryForAppend(index);
            if (null == entry) {
                return;
            }
            this.batchAppendEntryRequest.addEntry(entry);
            if (this.batchAppendEntryRequest.getTotalSize() >= (long)DLedgerEntryPusher.this.dLedgerConfig.getMaxBatchPushSize()) {
                this.sendBatchAppendEntryRequest();
            }
        }

        private void doCheckBatchAppendResponse() throws Exception {
            long peerWaterMark = DLedgerEntryPusher.this.getPeerWaterMark(this.term, this.peerId);
            Pair pair = (Pair)this.batchPendingMap.get(peerWaterMark + 1L);
            if (pair != null && System.currentTimeMillis() - (Long)pair.getKey() > (long)DLedgerEntryPusher.this.dLedgerConfig.getMaxPushTimeOutMs()) {
                long firstIndex = peerWaterMark + 1L;
                long lastIndex = firstIndex + (long)((Integer)pair.getValue()).intValue() - 1L;
                this.logger.warn("[Push-{}]Retry to push entry from {} to {}", new Object[]{this.peerId, firstIndex, lastIndex});
                this.batchAppendEntryRequest.clear();
                for (long i = firstIndex; i <= lastIndex; ++i) {
                    DLedgerEntry entry = DLedgerEntryPusher.this.dLedgerStore.get(i);
                    this.batchAppendEntryRequest.addEntry(entry);
                }
                this.sendBatchAppendEntryRequest();
            }
        }

        private void doBatchAppend() throws Exception {
            while (this.checkAndFreshState() && this.type.get() == PushEntryRequest.Type.APPEND) {
                if (this.writeIndex > DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex()) {
                    if (this.batchAppendEntryRequest.getCount() > 0) {
                        this.sendBatchAppendEntryRequest();
                    }
                    this.doCommit();
                    this.doCheckBatchAppendResponse();
                    break;
                }
                if (this.batchPendingMap.size() >= this.maxPendingSize || DLedgerUtils.elapsed(this.lastCheckLeakTimeMs) > 1000L) {
                    long peerWaterMark = DLedgerEntryPusher.this.getPeerWaterMark(this.term, this.peerId);
                    for (Map.Entry entry : this.batchPendingMap.entrySet()) {
                        if ((Long)entry.getKey() + (long)((Integer)((Pair)entry.getValue()).getValue()).intValue() - 1L > peerWaterMark) continue;
                        this.batchPendingMap.remove(entry.getKey());
                    }
                    this.lastCheckLeakTimeMs = System.currentTimeMillis();
                }
                if (this.batchPendingMap.size() >= this.maxPendingSize) {
                    this.doCheckBatchAppendResponse();
                    break;
                }
                this.doBatchAppendInner(this.writeIndex);
                ++this.writeIndex;
            }
        }

        private void doTruncate(long truncateIndex) throws Exception {
            PreConditions.check(this.type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
            DLedgerEntry truncateEntry = DLedgerEntryPusher.this.dLedgerStore.get(truncateIndex);
            PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN);
            this.logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", new Object[]{this.peerId, truncateIndex, truncateEntry.getPos()});
            PushEntryRequest truncateRequest = this.buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);
            PushEntryResponse truncateResponse = DLedgerEntryPusher.this.dLedgerRpcService.push(truncateRequest).get(3L, TimeUnit.SECONDS);
            PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);
            PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);
            this.lastPushCommitTimeMs = System.currentTimeMillis();
            this.changeState(truncateIndex, PushEntryRequest.Type.APPEND);
        }

        private synchronized void changeState(long index, PushEntryRequest.Type target) {
            this.logger.info("[Push-{}]Change state from {} to {} at {}", new Object[]{this.peerId, this.type.get(), target, index});
            switch (target) {
                case APPEND: {
                    this.compareIndex = -1L;
                    DLedgerEntryPusher.this.updatePeerWaterMark(this.term, this.peerId, index);
                    DLedgerEntryPusher.this.quorumAckChecker.wakeup();
                    this.writeIndex = index + 1L;
                    if (!DLedgerEntryPusher.this.dLedgerConfig.isEnableBatchPush()) break;
                    this.resetBatchAppendEntryRequest();
                    break;
                }
                case COMPARE: {
                    if (!this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) break;
                    this.compareIndex = -1L;
                    if (DLedgerEntryPusher.this.dLedgerConfig.isEnableBatchPush()) {
                        this.batchPendingMap.clear();
                        break;
                    }
                    this.pendingMap.clear();
                    break;
                }
                case TRUNCATE: {
                    this.compareIndex = -1L;
                    break;
                }
            }
            this.type.set(target);
        }

        private void doCompare() throws Exception {
            while (!(!this.checkAndFreshState() || this.type.get() != PushEntryRequest.Type.COMPARE && this.type.get() != PushEntryRequest.Type.TRUNCATE || this.compareIndex == -1L && DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() == -1L)) {
                if (this.compareIndex == -1L) {
                    this.compareIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex();
                    this.logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", (Object)this.peerId);
                } else if (this.compareIndex > DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() || this.compareIndex < DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex()) {
                    this.logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", new Object[]{this.peerId, this.compareIndex, DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex(), DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex()});
                    this.compareIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex();
                }
                DLedgerEntry entry = DLedgerEntryPusher.this.dLedgerStore.get(this.compareIndex);
                PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", this.compareIndex);
                PushEntryRequest request = this.buildPushRequest(entry, PushEntryRequest.Type.COMPARE);
                CompletableFuture<PushEntryResponse> responseFuture = DLedgerEntryPusher.this.dLedgerRpcService.push(request);
                PushEntryResponse response = responseFuture.get(3L, TimeUnit.SECONDS);
                PreConditions.check(response != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", this.compareIndex);
                PreConditions.check(response.getCode() == DLedgerResponseCode.INCONSISTENT_STATE.getCode() || response.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(response.getCode()), "compareIndex=%d", this.compareIndex);
                long truncateIndex = -1L;
                if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {
                    if (this.compareIndex == response.getEndIndex()) {
                        this.changeState(this.compareIndex, PushEntryRequest.Type.APPEND);
                        break;
                    }
                    truncateIndex = this.compareIndex;
                } else if (response.getEndIndex() < DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex() || response.getBeginIndex() > DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex()) {
                    truncateIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex();
                } else if (this.compareIndex < response.getBeginIndex()) {
                    truncateIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex();
                } else {
                    this.compareIndex = this.compareIndex > response.getEndIndex() ? response.getEndIndex() : --this.compareIndex;
                }
                if (this.compareIndex < DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex()) {
                    truncateIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex();
                }
                if (truncateIndex == -1L) continue;
                this.changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);
                this.doTruncate(truncateIndex);
                break;
            }
        }

        @Override
        public void doWork() {
            try {
                if (!this.checkAndFreshState()) {
                    this.waitForRunning(1L);
                    return;
                }
                if (this.type.get() == PushEntryRequest.Type.APPEND) {
                    if (DLedgerEntryPusher.this.dLedgerConfig.isEnableBatchPush()) {
                        this.doBatchAppend();
                    } else {
                        this.doAppend();
                    }
                } else {
                    this.doCompare();
                }
                this.waitForRunning(1L);
            }
            catch (Throwable t) {
                logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", new Object[]{this.peerId, this.getName(), this.writeIndex, this.compareIndex, t});
                this.changeState(-1L, PushEntryRequest.Type.COMPARE);
                DLedgerUtils.sleep(500L);
            }
        }
    }

    private class QuorumAckChecker
    extends ShutdownAbleThread {
        private long lastPrintWatermarkTimeMs;
        private long lastCheckLeakTimeMs;
        private long lastQuorumIndex;

        public QuorumAckChecker(Logger logger) {
            super("QuorumAckChecker-" + DLedgerEntryPusher.this.memberState.getSelfId(), logger);
            this.lastPrintWatermarkTimeMs = System.currentTimeMillis();
            this.lastCheckLeakTimeMs = System.currentTimeMillis();
            this.lastQuorumIndex = -1L;
        }

        @Override
        public void doWork() {
            try {
                Object response;
                if (DLedgerUtils.elapsed(this.lastPrintWatermarkTimeMs) > 3000L) {
                    this.logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}", new Object[]{DLedgerEntryPusher.this.memberState.getSelfId(), DLedgerEntryPusher.this.memberState.getRole(), DLedgerEntryPusher.this.memberState.currTerm(), DLedgerEntryPusher.this.dLedgerStore.getLedgerBeginIndex(), DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex(), DLedgerEntryPusher.this.dLedgerStore.getCommittedIndex(), JSON.toJSONString((Object)DLedgerEntryPusher.this.peerWaterMarksByTerm)});
                    this.lastPrintWatermarkTimeMs = System.currentTimeMillis();
                }
                if (!DLedgerEntryPusher.this.memberState.isLeader()) {
                    this.waitForRunning(1L);
                    return;
                }
                long currTerm = DLedgerEntryPusher.this.memberState.currTerm();
                DLedgerEntryPusher.this.checkTermForPendingMap(currTerm, "QuorumAckChecker");
                DLedgerEntryPusher.this.checkTermForWaterMark(currTerm, "QuorumAckChecker");
                if (DLedgerEntryPusher.this.pendingAppendResponsesByTerm.size() > 1) {
                    for (Long term : DLedgerEntryPusher.this.pendingAppendResponsesByTerm.keySet()) {
                        if (term == currTerm) continue;
                        for (Map.Entry futureEntry : ((ConcurrentMap)DLedgerEntryPusher.this.pendingAppendResponsesByTerm.get(term)).entrySet()) {
                            AppendEntryResponse response2 = new AppendEntryResponse();
                            response2.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
                            response2.setIndex((Long)futureEntry.getKey());
                            response2.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());
                            response2.setLeaderId(DLedgerEntryPusher.this.memberState.getLeaderId());
                            this.logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", new Object[]{futureEntry.getKey(), term, currTerm});
                            ((TimeoutFuture)futureEntry.getValue()).complete(response2);
                        }
                        DLedgerEntryPusher.this.pendingAppendResponsesByTerm.remove(term);
                    }
                }
                if (DLedgerEntryPusher.this.peerWaterMarksByTerm.size() > 1) {
                    for (Long term : DLedgerEntryPusher.this.peerWaterMarksByTerm.keySet()) {
                        if (term == currTerm) continue;
                        this.logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", (Object)term, (Object)currTerm);
                        DLedgerEntryPusher.this.peerWaterMarksByTerm.remove(term);
                    }
                }
                Map peerWaterMarks = (Map)DLedgerEntryPusher.this.peerWaterMarksByTerm.get(currTerm);
                List sortedWaterMarks = peerWaterMarks.values().stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
                long quorumIndex = (Long)sortedWaterMarks.get(sortedWaterMarks.size() / 2);
                DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
                ConcurrentMap responses = (ConcurrentMap)DLedgerEntryPusher.this.pendingAppendResponsesByTerm.get(currTerm);
                boolean needCheck = false;
                int ackNum = 0;
                Long i = quorumIndex;
                while (i > this.lastQuorumIndex) {
                    try {
                        CompletableFuture future = (CompletableFuture)responses.remove(i);
                        if (future == null) {
                            needCheck = true;
                            break;
                        }
                        if (!future.isDone()) {
                            response = new AppendEntryResponse();
                            ((RequestOrResponse)response).setGroup(DLedgerEntryPusher.this.memberState.getGroup());
                            ((RequestOrResponse)response).setTerm(currTerm);
                            ((AppendEntryResponse)response).setIndex(i);
                            ((RequestOrResponse)response).setLeaderId(DLedgerEntryPusher.this.memberState.getSelfId());
                            ((AppendEntryResponse)response).setPos(((AppendFuture)future).getPos());
                            future.complete(response);
                        }
                        ++ackNum;
                    }
                    catch (Throwable t) {
                        this.logger.error("Error in ack to index={} term={}", new Object[]{i, currTerm, t});
                    }
                    Long t = i;
                    i = i - 1L;
                    response = i;
                }
                if (ackNum == 0) {
                    TimeoutFuture future;
                    for (long i2 = quorumIndex + 1L; i2 < Integer.MAX_VALUE && (future = (TimeoutFuture)responses.get(i2)) != null && future.isTimeOut(); ++i2) {
                        AppendEntryResponse response3 = new AppendEntryResponse();
                        response3.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
                        response3.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());
                        response3.setTerm(currTerm);
                        response3.setIndex(i2);
                        response3.setLeaderId(DLedgerEntryPusher.this.memberState.getSelfId());
                        future.complete(response3);
                    }
                    this.waitForRunning(1L);
                }
                if (DLedgerUtils.elapsed(this.lastCheckLeakTimeMs) > 1000L || needCheck) {
                    DLedgerEntryPusher.this.updatePeerWaterMark(currTerm, DLedgerEntryPusher.this.memberState.getSelfId(), DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex());
                    for (Map.Entry futureEntry : responses.entrySet()) {
                        if ((Long)futureEntry.getKey() >= quorumIndex) continue;
                        response = new AppendEntryResponse();
                        ((RequestOrResponse)response).setGroup(DLedgerEntryPusher.this.memberState.getGroup());
                        ((RequestOrResponse)response).setTerm(currTerm);
                        ((AppendEntryResponse)response).setIndex((Long)futureEntry.getKey());
                        ((RequestOrResponse)response).setLeaderId(DLedgerEntryPusher.this.memberState.getSelfId());
                        ((AppendEntryResponse)response).setPos(((AppendFuture)futureEntry.getValue()).getPos());
                        ((TimeoutFuture)futureEntry.getValue()).complete(response);
                        responses.remove(futureEntry.getKey());
                    }
                    this.lastCheckLeakTimeMs = System.currentTimeMillis();
                }
                this.lastQuorumIndex = quorumIndex;
            }
            catch (Throwable t) {
                logger.error("Error in {}", (Object)this.getName(), (Object)t);
                DLedgerUtils.sleep(100L);
            }
        }
    }
}

