/*
 * Decompiled with CFR 0.152.
 */
package io.openmessaging.storage.dledger;

import io.openmessaging.storage.dledger.AppendFuture;
import io.openmessaging.storage.dledger.BatchAppendFuture;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerEntryPusher;
import io.openmessaging.storage.dledger.DLedgerLeaderElector;
import io.openmessaging.storage.dledger.DLedgerRpcNettyService;
import io.openmessaging.storage.dledger.DLedgerRpcService;
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.DLedgerProtocolHander;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.GetEntriesRequest;
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
import io.openmessaging.storage.dledger.protocol.HeartBeatRequest;
import io.openmessaging.storage.dledger.protocol.HeartBeatResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.MetadataRequest;
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.VoteRequest;
import io.openmessaging.storage.dledger.protocol.VoteResponse;
import io.openmessaging.storage.dledger.store.DLedgerMemoryStore;
import io.openmessaging.storage.dledger.store.DLedgerStore;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import io.openmessaging.storage.dledger.utils.PreConditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DLedgerServer
implements DLedgerProtocolHander {
    private static Logger logger = LoggerFactory.getLogger(DLedgerServer.class);
    private MemberState memberState;
    private DLedgerConfig dLedgerConfig;
    private DLedgerStore dLedgerStore;
    private DLedgerRpcService dLedgerRpcService;
    private DLedgerEntryPusher dLedgerEntryPusher;
    private DLedgerLeaderElector dLedgerLeaderElector;
    private ScheduledExecutorService executorService;

    public DLedgerServer(DLedgerConfig dLedgerConfig) {
        this.dLedgerConfig = dLedgerConfig;
        this.memberState = new MemberState(dLedgerConfig);
        this.dLedgerStore = this.createDLedgerStore(dLedgerConfig.getStoreType(), this.dLedgerConfig, this.memberState);
        this.dLedgerRpcService = new DLedgerRpcNettyService(this);
        this.dLedgerEntryPusher = new DLedgerEntryPusher(dLedgerConfig, this.memberState, this.dLedgerStore, this.dLedgerRpcService);
        this.dLedgerLeaderElector = new DLedgerLeaderElector(dLedgerConfig, this.memberState, this.dLedgerRpcService);
        this.executorService = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("DLedgerServer-ScheduledExecutor");
            return t;
        });
    }

    public void startup() {
        this.dLedgerStore.startup();
        this.dLedgerRpcService.startup();
        this.dLedgerEntryPusher.startup();
        this.dLedgerLeaderElector.startup();
        this.executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000L, 1000L, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        this.dLedgerLeaderElector.shutdown();
        this.dLedgerEntryPusher.shutdown();
        this.dLedgerRpcService.shutdown();
        this.dLedgerStore.shutdown();
        this.executorService.shutdown();
    }

    private DLedgerStore createDLedgerStore(String storeType, DLedgerConfig config, MemberState memberState) {
        if (storeType.equals("MEMORY")) {
            return new DLedgerMemoryStore(config, memberState);
        }
        return new DLedgerMmapFileStore(config, memberState);
    }

    public MemberState getMemberState() {
        return this.memberState;
    }

    @Override
    public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest request) throws Exception {
        try {
            PreConditions.check(this.memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), this.memberState.getSelfId());
            PreConditions.check(this.memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), this.memberState.getGroup());
            return this.dLedgerLeaderElector.handleHeartBeat(request);
        }
        catch (DLedgerException e) {
            logger.error("[{}][HandleHeartBeat] failed", (Object)this.memberState.getSelfId(), (Object)e);
            HeartBeatResponse response = new HeartBeatResponse();
            response.copyBaseInfo(request);
            response.setCode(e.getCode().getCode());
            response.setLeaderId(this.memberState.getLeaderId());
            return CompletableFuture.completedFuture(response);
        }
    }

    @Override
    public CompletableFuture<VoteResponse> handleVote(VoteRequest request) throws Exception {
        try {
            PreConditions.check(this.memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), this.memberState.getSelfId());
            PreConditions.check(this.memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), this.memberState.getGroup());
            return this.dLedgerLeaderElector.handleVote(request, false);
        }
        catch (DLedgerException e) {
            logger.error("[{}][HandleVote] failed", (Object)this.memberState.getSelfId(), (Object)e);
            VoteResponse response = new VoteResponse();
            response.copyBaseInfo(request);
            response.setCode(e.getCode().getCode());
            response.setLeaderId(this.memberState.getLeaderId());
            return CompletableFuture.completedFuture(response);
        }
    }

    @Override
    public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest request) throws IOException {
        try {
            PreConditions.check(this.memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), this.memberState.getSelfId());
            PreConditions.check(this.memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), this.memberState.getGroup());
            PreConditions.check(this.memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
            PreConditions.check(this.memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING);
            long currTerm = this.memberState.currTerm();
            if (this.dLedgerEntryPusher.isPendingFull(currTerm)) {
                AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
                appendEntryResponse.setGroup(this.memberState.getGroup());
                appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());
                appendEntryResponse.setTerm(currTerm);
                appendEntryResponse.setLeaderId(this.memberState.getSelfId());
                return AppendFuture.newCompletedFuture(-1L, appendEntryResponse);
            }
            if (request instanceof BatchAppendEntryRequest) {
                BatchAppendEntryRequest batchRequest = (BatchAppendEntryRequest)request;
                if (batchRequest.getBatchMsgs() != null && batchRequest.getBatchMsgs().size() != 0) {
                    long[] positions = new long[batchRequest.getBatchMsgs().size()];
                    DLedgerEntry resEntry = null;
                    int index = 0;
                    Iterator<byte[]> iterator = batchRequest.getBatchMsgs().iterator();
                    while (iterator.hasNext()) {
                        DLedgerEntry dLedgerEntry = new DLedgerEntry();
                        dLedgerEntry.setBody(iterator.next());
                        resEntry = this.dLedgerStore.appendAsLeader(dLedgerEntry);
                        positions[index++] = resEntry.getPos();
                    }
                    BatchAppendFuture batchAppendFuture = (BatchAppendFuture)this.dLedgerEntryPusher.waitAck(resEntry, true);
                    batchAppendFuture.setPositions(positions);
                    return batchAppendFuture;
                }
                throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest with empty bodys");
            }
            DLedgerEntry dLedgerEntry = new DLedgerEntry();
            dLedgerEntry.setBody(request.getBody());
            DLedgerEntry resEntry = this.dLedgerStore.appendAsLeader(dLedgerEntry);
            return this.dLedgerEntryPusher.waitAck(resEntry, false);
        }
        catch (DLedgerException e) {
            logger.error("[{}][HandleAppend] failed", (Object)this.memberState.getSelfId(), (Object)e);
            AppendEntryResponse response = new AppendEntryResponse();
            response.copyBaseInfo(request);
            response.setCode(e.getCode().getCode());
            response.setLeaderId(this.memberState.getLeaderId());
            return AppendFuture.newCompletedFuture(-1L, response);
        }
    }

    @Override
    public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request) throws IOException {
        try {
            PreConditions.check(this.memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), this.memberState.getSelfId());
            PreConditions.check(this.memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), this.memberState.getGroup());
            PreConditions.check(this.memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
            DLedgerEntry entry = this.dLedgerStore.get(request.getBeginIndex());
            GetEntriesResponse response = new GetEntriesResponse();
            response.setGroup(this.memberState.getGroup());
            if (entry != null) {
                response.setEntries(Collections.singletonList(entry));
            }
            return CompletableFuture.completedFuture(response);
        }
        catch (DLedgerException e) {
            logger.error("[{}][HandleGet] failed", (Object)this.memberState.getSelfId(), (Object)e);
            GetEntriesResponse response = new GetEntriesResponse();
            response.copyBaseInfo(request);
            response.setLeaderId(this.memberState.getLeaderId());
            response.setCode(e.getCode().getCode());
            return CompletableFuture.completedFuture(response);
        }
    }

    @Override
    public CompletableFuture<MetadataResponse> handleMetadata(MetadataRequest request) throws Exception {
        try {
            PreConditions.check(this.memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), this.memberState.getSelfId());
            PreConditions.check(this.memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), this.memberState.getGroup());
            MetadataResponse metadataResponse = new MetadataResponse();
            metadataResponse.setGroup(this.memberState.getGroup());
            metadataResponse.setPeers(this.memberState.getPeerMap());
            metadataResponse.setLeaderId(this.memberState.getLeaderId());
            return CompletableFuture.completedFuture(metadataResponse);
        }
        catch (DLedgerException e) {
            logger.error("[{}][HandleMetadata] failed", (Object)this.memberState.getSelfId(), (Object)e);
            MetadataResponse response = new MetadataResponse();
            response.copyBaseInfo(request);
            response.setCode(e.getCode().getCode());
            response.setLeaderId(this.memberState.getLeaderId());
            return CompletableFuture.completedFuture(response);
        }
    }

    @Override
    public CompletableFuture<PullEntriesResponse> handlePull(PullEntriesRequest request) {
        return null;
    }

    @Override
    public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
        try {
            PreConditions.check(this.memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), this.memberState.getSelfId());
            PreConditions.check(this.memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), this.memberState.getGroup());
            return this.dLedgerEntryPusher.handlePush(request);
        }
        catch (DLedgerException e) {
            logger.error("[{}][HandlePush] failed", (Object)this.memberState.getSelfId(), (Object)e);
            PushEntryResponse response = new PushEntryResponse();
            response.copyBaseInfo(request);
            response.setCode(e.getCode().getCode());
            response.setLeaderId(this.memberState.getLeaderId());
            return CompletableFuture.completedFuture(response);
        }
    }

    @Override
    public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(LeadershipTransferRequest request) throws Exception {
        try {
            PreConditions.check(this.memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), this.memberState.getSelfId());
            PreConditions.check(this.memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), this.memberState.getGroup());
            if (this.memberState.getSelfId().equals(request.getTransferId())) {
                PreConditions.check(this.memberState.isPeerMember(request.getTransfereeId()), DLedgerResponseCode.UNKNOWN_MEMBER, "transferee=%s is not a peer member", request.getTransfereeId());
                PreConditions.check(this.memberState.currTerm() == request.getTerm(), DLedgerResponseCode.INCONSISTENT_TERM, "currTerm(%s) != request.term(%s)", this.memberState.currTerm(), request.getTerm());
                PreConditions.check(this.memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, "selfId=%s is not leader=%s", this.memberState.getSelfId(), this.memberState.getLeaderId());
                long transfereeFallBehind = this.dLedgerStore.getLedgerEndIndex() - this.dLedgerEntryPusher.getPeerWaterMark(request.getTerm(), request.getTransfereeId());
                PreConditions.check(transfereeFallBehind < this.dLedgerConfig.getMaxLeadershipTransferWaitIndex(), DLedgerResponseCode.FALL_BEHIND_TOO_MUCH, "transferee fall behind too much, diff=%s", transfereeFallBehind);
                return this.dLedgerLeaderElector.handleLeadershipTransfer(request);
            }
            if (this.memberState.getSelfId().equals(request.getTransfereeId())) {
                PreConditions.check(request.getTransferId().equals(this.memberState.getLeaderId()), DLedgerResponseCode.INCONSISTENT_LEADER, "transfer=%s is not leader", request.getTransferId());
                long costTime = 0L;
                long startTime = System.currentTimeMillis();
                long fallBehind = request.getTakeLeadershipLedgerIndex() - this.memberState.getLedgerEndIndex();
                while (fallBehind > 0L) {
                    if (costTime > this.dLedgerConfig.getLeadershipTransferWaitTimeout()) {
                        throw new DLedgerException(DLedgerResponseCode.TAKE_LEADERSHIP_FAILED, "transferee fall behind, wait timeout. timeout = {}, diff = {}", this.dLedgerConfig.getLeadershipTransferWaitTimeout(), fallBehind);
                    }
                    logger.warn("transferee fall behind, diff = {}", (Object)fallBehind);
                    Thread.sleep(10L);
                    fallBehind = request.getTakeLeadershipLedgerIndex() - this.memberState.getLedgerEndIndex();
                    costTime = System.currentTimeMillis() - startTime;
                }
                return this.dLedgerLeaderElector.handleTakeLeadership(request);
            }
            return CompletableFuture.completedFuture(new LeadershipTransferResponse().term(this.memberState.currTerm()).code(DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
        }
        catch (DLedgerException e) {
            logger.error("[{}][handleLeadershipTransfer] failed", (Object)this.memberState.getSelfId(), (Object)e);
            LeadershipTransferResponse response = new LeadershipTransferResponse();
            response.copyBaseInfo(request);
            response.setCode(e.getCode().getCode());
            response.setLeaderId(this.memberState.getLeaderId());
            return CompletableFuture.completedFuture(response);
        }
    }

    private void checkPreferredLeader() {
        if (!this.memberState.isLeader()) {
            return;
        }
        if (this.dLedgerConfig.getPreferredLeaderIds() == null) {
            return;
        }
        if (this.memberState.getTransferee() != null) {
            return;
        }
        ArrayList<String> preferredLeaderIds = new ArrayList<String>(Arrays.asList(this.dLedgerConfig.getPreferredLeaderIds().split(";")));
        if (preferredLeaderIds.contains(this.dLedgerConfig.getSelfId())) {
            return;
        }
        Iterator it = preferredLeaderIds.iterator();
        while (it.hasNext()) {
            String preferredLeaderId = (String)it.next();
            if (!this.memberState.isPeerMember(preferredLeaderId)) {
                it.remove();
                logger.warn("preferredLeaderId = {} is not a peer member", (Object)preferredLeaderId);
                continue;
            }
            if (!this.memberState.getPeersLiveTable().containsKey(preferredLeaderId) || this.memberState.getPeersLiveTable().get(preferredLeaderId).booleanValue() == Boolean.FALSE.booleanValue()) {
                it.remove();
                logger.warn("preferredLeaderId = {} is not online", (Object)preferredLeaderId);
                continue;
            }
            long fallBehind = this.dLedgerStore.getLedgerEndIndex() - this.dLedgerEntryPusher.getPeerWaterMark(this.memberState.currTerm(), preferredLeaderId);
            if (fallBehind < this.dLedgerConfig.getMaxLeadershipTransferWaitIndex()) continue;
            logger.warn("preferredLeaderId = {} transferee fall behind index : {}", (Object)preferredLeaderId, (Object)fallBehind);
        }
        if (preferredLeaderIds.size() == 0) {
            return;
        }
        long minFallBehind = Long.MAX_VALUE;
        String preferredLeaderId = (String)preferredLeaderIds.get(0);
        for (String peerId : preferredLeaderIds) {
            long fallBehind = this.dLedgerStore.getLedgerEndIndex() - this.dLedgerEntryPusher.getPeerWaterMark(this.memberState.currTerm(), peerId);
            if (fallBehind >= minFallBehind) continue;
            minFallBehind = fallBehind;
            preferredLeaderId = peerId;
        }
        logger.info("preferredLeaderId = {}, which has the smallest fall behind index = {} and is decided to be transferee.", (Object)preferredLeaderId, (Object)minFallBehind);
        if (minFallBehind < this.dLedgerConfig.getMaxLeadershipTransferWaitIndex()) {
            LeadershipTransferRequest request = new LeadershipTransferRequest();
            request.setTerm(this.memberState.currTerm());
            request.setTransfereeId(preferredLeaderId);
            try {
                long startTransferTime = System.currentTimeMillis();
                LeadershipTransferResponse response = this.dLedgerLeaderElector.handleLeadershipTransfer(request).get();
                logger.info("transfer finished. request={},response={},cost={}ms", new Object[]{request, response, DLedgerUtils.elapsed(startTransferTime)});
            }
            catch (Throwable t) {
                logger.error("[checkPreferredLeader] error, request={}", (Object)request, (Object)t);
            }
        }
    }

    public DLedgerStore getdLedgerStore() {
        return this.dLedgerStore;
    }

    public DLedgerRpcService getdLedgerRpcService() {
        return this.dLedgerRpcService;
    }

    public DLedgerLeaderElector getdLedgerLeaderElector() {
        return this.dLedgerLeaderElector;
    }

    public DLedgerConfig getdLedgerConfig() {
        return this.dLedgerConfig;
    }
}

