package org.apache.flink.runtime.io.network.partition.external;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.FixedLengthBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/ExternalBlockResultPartitionManager.class */
public class ExternalBlockResultPartitionManager implements ResultPartitionProvider {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalBlockResultPartitionManager.class);
    private final ExternalBlockShuffleServiceConfiguration shuffleServiceConfiguration;
    private final LocalResultPartitionResolver resultPartitionResolver;

    @VisibleForTesting
    final FixedLengthBufferPool bufferPool;
    private final ScheduledExecutorService resultPartitionRecyclerExecutorService;

    @VisibleForTesting
    final Map<String, ThreadPoolExecutor> dirToThreadPool = new HashMap();

    @VisibleForTesting
    final ConcurrentHashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> resultPartitionMetaMap = new ConcurrentHashMap<>();
    private final AtomicBoolean isStopped = new AtomicBoolean(false);

    public ExternalBlockResultPartitionManager(ExternalBlockShuffleServiceConfiguration externalBlockShuffleServiceConfiguration) throws Exception {
        this.shuffleServiceConfiguration = externalBlockShuffleServiceConfiguration;
        this.resultPartitionResolver = LocalResultPartitionResolverFactory.create(externalBlockShuffleServiceConfiguration);
        this.bufferPool = new FixedLengthBufferPool(externalBlockShuffleServiceConfiguration.getBufferNumber().intValue(), externalBlockShuffleServiceConfiguration.getMemorySizePerBufferInBytes().intValue(), MemoryType.OFF_HEAP);
        constructThreadPools();
        this.resultPartitionRecyclerExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.resultPartitionRecyclerExecutorService.scheduleWithFixedDelay(() -> {
            recycleResultPartitions();
        }, 0L, externalBlockShuffleServiceConfiguration.getDiskScanIntervalInMS().longValue(), TimeUnit.MILLISECONDS);
        LOG.info("Final configurations: " + externalBlockShuffleServiceConfiguration);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartitionProvider
    public ResultSubpartitionView createSubpartitionView(ResultPartitionID resultPartitionID, int i, BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
        if (this.isStopped.get()) {
            throw new IOException("ExternalBlockResultPartitionManager has already been stopped.");
        }
        ExternalBlockResultPartitionMeta externalBlockResultPartitionMeta = this.resultPartitionMetaMap.get(resultPartitionID);
        if (externalBlockResultPartitionMeta == null) {
            externalBlockResultPartitionMeta = new ExternalBlockResultPartitionMeta(resultPartitionID, this.shuffleServiceConfiguration.getFileSystem(), this.resultPartitionResolver.getResultPartitionDir(resultPartitionID));
            ExternalBlockResultPartitionMeta putIfAbsent = this.resultPartitionMetaMap.putIfAbsent(resultPartitionID, externalBlockResultPartitionMeta);
            if (putIfAbsent != null) {
                externalBlockResultPartitionMeta = putIfAbsent;
            }
        }
        ExternalBlockSubpartitionView externalBlockSubpartitionView = new ExternalBlockSubpartitionView(externalBlockResultPartitionMeta, i, this.dirToThreadPool.get(externalBlockResultPartitionMeta.getRootDir()), resultPartitionID, this.bufferPool, this.shuffleServiceConfiguration.getWaitCreditDelay().longValue(), bufferAvailabilityListener);
        externalBlockResultPartitionMeta.notifySubpartitionStartConsuming(i);
        return externalBlockSubpartitionView;
    }

    public void initializeApplication(String str, String str2) {
        this.resultPartitionResolver.initializeApplication(str, str2);
    }

    public void stopApplication(String str) {
        Set<ResultPartitionID> stopApplication = this.resultPartitionResolver.stopApplication(str);
        if (stopApplication.isEmpty()) {
            return;
        }
        stopApplication.forEach(resultPartitionID -> {
            this.resultPartitionMetaMap.remove(resultPartitionID);
        });
    }

    public void stop() {
        LOG.warn("Stop ExternalBlockResultPartitionManager, probably ShuffleService is stopped");
        try {
            if (!this.isStopped.compareAndSet(false, true)) {
                LOG.info("ExternalBlockResultPartitionManager has already been stopped.");
                return;
            }
            Iterator<Map.Entry<String, ThreadPoolExecutor>> it = this.dirToThreadPool.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().shutdownNow();
            }
            this.resultPartitionRecyclerExecutorService.shutdownNow();
            this.resultPartitionResolver.stop();
            this.bufferPool.lazyDestroy();
            this.resultPartitionMetaMap.clear();
        } catch (Throwable th) {
            LOG.error("Exception occurs when stopping ExternalBlockResultPartitionManager", th);
        }
    }

    private void constructThreadPools() {
        ThreadGroup threadGroup = new ThreadGroup("Disk IO Thread Group");
        this.shuffleServiceConfiguration.getDirToDiskType().forEach((str, str2) -> {
            Integer num = this.shuffleServiceConfiguration.getDiskTypeToIOThreadNum().get(str2);
            Comparator newSubpartitionViewComparator = this.shuffleServiceConfiguration.newSubpartitionViewComparator();
            this.dirToThreadPool.put(str, new ThreadPoolExecutor(num.intValue(), num.intValue(), 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) (newSubpartitionViewComparator != null ? new PriorityBlockingQueue(200, newSubpartitionViewComparator) : new LinkedBlockingQueue()), new DispatcherThreadFactory(threadGroup, "IO thread [" + str2 + "] [" + str + "]")));
        });
    }

    @VisibleForTesting
    void recycleResultPartitions() {
        long currentTimeMillis = System.currentTimeMillis();
        HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> hashMap = new HashMap<>();
        HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> hashMap2 = new HashMap<>();
        for (Map.Entry<ResultPartitionID, ExternalBlockResultPartitionMeta> entry : this.resultPartitionMetaMap.entrySet()) {
            ResultPartitionID key = entry.getKey();
            ExternalBlockResultPartitionMeta value = entry.getValue();
            if (value.hasInitialized() && value.getReferenceCount() <= 0) {
                if (value.getUnconsumedSubpartitionCount() <= 0) {
                    if (currentTimeMillis - value.getLastActiveTimeInMs() > value.getConsumedPartitionTTL()) {
                        hashMap.put(key, value);
                    }
                } else if (currentTimeMillis - value.getLastActiveTimeInMs() > value.getPartialConsumedPartitionTTL()) {
                    hashMap2.put(key, value);
                }
            }
        }
        removeResultPartitionAndMeta(hashMap, "CONSUMED_PARTITION_TTL_TIMEOUT", LOG.isDebugEnabled());
        removeResultPartitionAndMeta(hashMap2, "PARTIAL_CONSUMED_PARTITION_TTL_TIMEOUT", true);
    }

    private void removeResultPartitionAndMeta(HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> hashMap, String str, boolean z) {
        if (hashMap.isEmpty()) {
            return;
        }
        Iterator<Map.Entry<ResultPartitionID, ExternalBlockResultPartitionMeta>> it = hashMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<ResultPartitionID, ExternalBlockResultPartitionMeta> next = it.next();
            ResultPartitionID key = next.getKey();
            ExternalBlockResultPartitionMeta value = next.getValue();
            if (value.getReferenceCount() > 0) {
                it.remove();
            } else {
                this.resultPartitionMetaMap.remove(key);
                this.resultPartitionResolver.recycleResultPartition(key);
                if (z) {
                    LOG.info("Delete partition's directory: {}, reason: {}, lastActiveTime: {}", new Object[]{value.getResultPartitionDir(), str, Long.valueOf(value.getLastActiveTimeInMs())});
                }
            }
        }
    }
}
