package org.apache.flink.runtime.preaggregatedaccumulators;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.taskexecutor.JobManagerConnection;
import org.apache.flink.runtime.taskexecutor.JobManagerTable;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/preaggregatedaccumulators/RPCBasedAccumulatorAggregationManager.class */
public class RPCBasedAccumulatorAggregationManager implements AccumulatorAggregationManager {
    private final JobManagerTable jobManagerTable;
    private final Map<JobID, Map<String, AggregatedAccumulator>> perJobAggregatedAccumulators = new HashMap();
    private final Object queryLock = new Object();

    @GuardedBy("queryLock")
    private final Map<JobID, Map<String, List<CompletableFuture<Accumulator>>>> perJobUnfulfilledUserQueryFutures = new HashMap();

    @GuardedBy("queryLock")
    private final Map<JobID, Map<String, Object>> perJobCachedQueryResults = new HashMap();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/preaggregatedaccumulators/RPCBasedAccumulatorAggregationManager$AggregatedAccumulator.class */
    public static final class AggregatedAccumulator {
        private final Set<Integer> registeredTasks = new HashSet();
        private final Set<Integer> committedTasks = new HashSet();
        private final JobVertexID jobVertexId;
        private Accumulator aggregatedValue;

        AggregatedAccumulator(JobVertexID jobVertexID) {
            this.jobVertexId = jobVertexID;
        }

        void registerForTask(JobVertexID jobVertexID, int i) {
            Preconditions.checkArgument(this.jobVertexId.equals(jobVertexID), "The registered task belongs to different JobVertex with previous registered ones");
            Preconditions.checkState(!this.registeredTasks.contains(Integer.valueOf(i)), "This task has already registered.");
            this.registeredTasks.add(Integer.valueOf(i));
        }

        void commitForTask(JobVertexID jobVertexID, int i, Accumulator accumulator) {
            Preconditions.checkArgument(this.jobVertexId.equals(jobVertexID), "The registered task belongs to different JobVertex with previous registered ones");
            Preconditions.checkState(this.registeredTasks.contains(Integer.valueOf(i)), "Can not commit for an accumulator that has not been registered before");
            if (this.aggregatedValue == null) {
                this.aggregatedValue = accumulator.clone();
            } else {
                this.aggregatedValue.merge(accumulator);
            }
            this.committedTasks.add(Integer.valueOf(i));
        }

        void clearRegistrationForTask(int i) {
            if (!this.registeredTasks.contains(Integer.valueOf(i)) || this.committedTasks.contains(Integer.valueOf(i))) {
                return;
            }
            this.registeredTasks.remove(Integer.valueOf(i));
        }

        boolean isEmpty() {
            return this.registeredTasks.size() == 0;
        }

        boolean isAllCommitted() {
            return this.registeredTasks.size() > 0 && this.registeredTasks.size() == this.committedTasks.size();
        }

        Accumulator getAggregatedValue() {
            return this.aggregatedValue;
        }

        JobVertexID getJobVertexId() {
            return this.jobVertexId;
        }

        Set<Integer> getCommittedTasks() {
            return this.committedTasks;
        }

        @VisibleForTesting
        Set<Integer> getRegisteredTasks() {
            return this.registeredTasks;
        }
    }

    public RPCBasedAccumulatorAggregationManager(JobManagerTable jobManagerTable) {
        this.jobManagerTable = jobManagerTable;
    }

    @Override // org.apache.flink.runtime.preaggregatedaccumulators.AccumulatorAggregationManager
    public void registerPreAggregatedAccumulator(JobID jobID, JobVertexID jobVertexID, int i, String str) {
        synchronized (this.perJobAggregatedAccumulators) {
            this.perJobAggregatedAccumulators.computeIfAbsent(jobID, jobID2 -> {
                return new HashMap();
            }).computeIfAbsent(str, str2 -> {
                return new AggregatedAccumulator(jobVertexID);
            }).registerForTask(jobVertexID, i);
        }
    }

    @Override // org.apache.flink.runtime.preaggregatedaccumulators.AccumulatorAggregationManager
    public void commitPreAggregatedAccumulator(JobID jobID, JobVertexID jobVertexID, int i, String str, Accumulator accumulator) {
        synchronized (this.perJobAggregatedAccumulators) {
            Map<String, AggregatedAccumulator> map = this.perJobAggregatedAccumulators.get(jobID);
            AggregatedAccumulator aggregatedAccumulator = map != null ? map.get(str) : null;
            Preconditions.checkState(aggregatedAccumulator != null, "The committed accumulator does not exist.");
            aggregatedAccumulator.commitForTask(jobVertexID, i, accumulator);
            if (aggregatedAccumulator.isAllCommitted()) {
                commitAggregatedAccumulators(jobID, Collections.singletonList(new CommitAccumulator(aggregatedAccumulator.getJobVertexId(), str, aggregatedAccumulator.getAggregatedValue(), aggregatedAccumulator.getCommittedTasks())));
                map.remove(str);
            }
            if (map.isEmpty()) {
                this.perJobAggregatedAccumulators.remove(jobID);
            }
        }
    }

    @Override // org.apache.flink.runtime.preaggregatedaccumulators.AccumulatorAggregationManager
    public <V, A extends Serializable> CompletableFuture<Accumulator<V, A>> queryPreAggregatedAccumulator(JobID jobID, String str) {
        synchronized (this.queryLock) {
            CompletableFuture<Accumulator<V, A>> completableFuture = new CompletableFuture<>();
            Map<String, Object> map = this.perJobCachedQueryResults.get(jobID);
            Object obj = map == null ? null : map.get(str);
            if (obj != null) {
                if (obj instanceof Accumulator) {
                    completableFuture.complete((Accumulator) obj);
                    return completableFuture;
                }
                if (!(obj instanceof Throwable)) {
                    throw new IllegalStateException("The cached result should be either accumulator or throwable.");
                }
                completableFuture.completeExceptionally((Throwable) obj);
                return completableFuture;
            }
            if (this.perJobUnfulfilledUserQueryFutures.containsKey(jobID) && this.perJobUnfulfilledUserQueryFutures.get(jobID).containsKey(str)) {
                this.perJobUnfulfilledUserQueryFutures.get(jobID).get(str).add(completableFuture);
            } else {
                JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobID);
                if (jobManagerConnection == null) {
                    completableFuture.complete(null);
                    return completableFuture;
                }
                this.perJobUnfulfilledUserQueryFutures.computeIfAbsent(jobID, jobID2 -> {
                    return new HashMap();
                }).computeIfAbsent(str, str2 -> {
                    return new ArrayList();
                }).add(completableFuture);
                jobManagerConnection.getJobManagerGateway().queryPreAggregatedAccumulator(str).whenComplete((accumulator, th) -> {
                    onAccumulatorQueryFinished(jobID, str, accumulator, th);
                });
            }
            return completableFuture;
        }
    }

    @Override // org.apache.flink.runtime.preaggregatedaccumulators.AccumulatorAggregationManager
    public void clearRegistrationForTask(JobID jobID, JobVertexID jobVertexID, int i) {
        synchronized (this.perJobAggregatedAccumulators) {
            Map<String, AggregatedAccumulator> map = this.perJobAggregatedAccumulators.get(jobID);
            if (map != null) {
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                for (Map.Entry<String, AggregatedAccumulator> entry : map.entrySet()) {
                    AggregatedAccumulator value = entry.getValue();
                    if (value.getJobVertexId().equals(jobVertexID)) {
                        value.clearRegistrationForTask(i);
                        if (value.isAllCommitted()) {
                            arrayList.add(new CommitAccumulator(entry.getValue().getJobVertexId(), entry.getKey(), entry.getValue().getAggregatedValue(), entry.getValue().getCommittedTasks()));
                        }
                        if (value.isAllCommitted() || value.isEmpty()) {
                            arrayList2.add(entry.getKey());
                        }
                    }
                }
                if (arrayList.size() > 0) {
                    commitAggregatedAccumulators(jobID, arrayList);
                }
                Iterator it = arrayList2.iterator();
                while (it.hasNext()) {
                    map.remove((String) it.next());
                }
                if (map.isEmpty()) {
                    this.perJobAggregatedAccumulators.remove(jobID);
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.preaggregatedaccumulators.AccumulatorAggregationManager
    public void clearAccumulatorsForJob(JobID jobID) {
        synchronized (this.perJobAggregatedAccumulators) {
            this.perJobAggregatedAccumulators.computeIfPresent(jobID, (jobID2, map) -> {
                map.clear();
                return null;
            });
        }
        synchronized (this.queryLock) {
            this.perJobUnfulfilledUserQueryFutures.computeIfPresent(jobID, (jobID3, map2) -> {
                map2.clear();
                return null;
            });
            this.perJobCachedQueryResults.computeIfPresent(jobID, (jobID4, map3) -> {
                map3.clear();
                return null;
            });
        }
    }

    @VisibleForTesting
    Map<JobID, Map<String, AggregatedAccumulator>> getPerJobAggregatedAccumulators() {
        return this.perJobAggregatedAccumulators;
    }

    @VisibleForTesting
    public Map<JobID, Map<String, List<CompletableFuture<Accumulator>>>> getPerJobUnfulfilledUserQueryFutures() {
        return this.perJobUnfulfilledUserQueryFutures;
    }

    @VisibleForTesting
    public Map<JobID, Map<String, Object>> getPerJobCachedQueryResults() {
        return this.perJobCachedQueryResults;
    }

    private void commitAggregatedAccumulators(JobID jobID, List<CommitAccumulator> list) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.perJobAggregatedAccumulators)) {
            throw new AssertionError();
        }
        JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobID);
        if (jobManagerConnection != null) {
            jobManagerConnection.getJobManagerGateway().commitPreAggregatedAccumulator(list);
        }
    }

    private <V, A extends Serializable> void onAccumulatorQueryFinished(JobID jobID, String str, Accumulator<V, A> accumulator, Throwable th) {
        synchronized (this.queryLock) {
            Preconditions.checkState((this.perJobCachedQueryResults.containsKey(jobID) && this.perJobCachedQueryResults.get(jobID).containsKey(str)) ? false : true, "The target accumulator " + str + " of job " + jobID + " should not be in the cached result list.");
            Preconditions.checkState(this.perJobUnfulfilledUserQueryFutures.containsKey(jobID) && this.perJobUnfulfilledUserQueryFutures.get(jobID).containsKey(str), "The target accumulator should reside in the unfulfilled query map.");
            Map<String, Object> computeIfAbsent = this.perJobCachedQueryResults.computeIfAbsent(jobID, jobID2 -> {
                return new HashMap();
            });
            List<CompletableFuture<Accumulator>> list = this.perJobUnfulfilledUserQueryFutures.get(jobID).get(str);
            if (accumulator != null) {
                list.forEach(completableFuture -> {
                    completableFuture.complete(accumulator);
                });
                computeIfAbsent.put(str, accumulator);
            } else {
                list.forEach(completableFuture2 -> {
                    completableFuture2.completeExceptionally(th);
                });
                computeIfAbsent.put(str, th);
            }
            this.perJobUnfulfilledUserQueryFutures.compute(jobID, (jobID3, map) -> {
                map.remove(str);
                if (map.size() == 0) {
                    return null;
                }
                return map;
            });
        }
    }

    static {
        $assertionsDisabled = !RPCBasedAccumulatorAggregationManager.class.desiredAssertionStatus();
    }
}
