/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network;

import java.io.File;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartitionFactory;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyShuffleServiceFactory
implements ShuffleServiceFactory<NettyShuffleDescriptor, ResultPartition, SingleInputGate> {
    private static final Logger LOG = LoggerFactory.getLogger(NettyShuffleServiceFactory.class);
    private static final String DIR_NAME_PREFIX = "netty-shuffle";

    public NettyShuffleMaster createShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
        return new NettyShuffleMaster(shuffleMasterContext.getConfiguration());
    }

    public NettyShuffleEnvironment createShuffleEnvironment(ShuffleEnvironmentContext shuffleEnvironmentContext) {
        Preconditions.checkNotNull((Object)shuffleEnvironmentContext);
        NettyShuffleEnvironmentConfiguration networkConfig = NettyShuffleEnvironmentConfiguration.fromConfiguration(shuffleEnvironmentContext.getConfiguration(), shuffleEnvironmentContext.getNetworkMemorySize(), shuffleEnvironmentContext.isLocalCommunicationOnly(), shuffleEnvironmentContext.getHostAddress());
        return NettyShuffleServiceFactory.createNettyShuffleEnvironment(networkConfig, shuffleEnvironmentContext.getTaskExecutorResourceId(), shuffleEnvironmentContext.getEventPublisher(), shuffleEnvironmentContext.getParentMetricGroup(), shuffleEnvironmentContext.getIoExecutor(), shuffleEnvironmentContext.getNumberOfSlots(), shuffleEnvironmentContext.getTmpDirPaths());
    }

    @VisibleForTesting
    static NettyShuffleEnvironment createNettyShuffleEnvironment(NettyShuffleEnvironmentConfiguration config, ResourceID taskExecutorResourceId, TaskEventPublisher taskEventPublisher, MetricGroup metricGroup, Executor ioExecutor, int numberOfSlots, String[] tmpDirPaths) {
        return NettyShuffleServiceFactory.createNettyShuffleEnvironment(config, taskExecutorResourceId, taskEventPublisher, new ResultPartitionManager(), metricGroup, ioExecutor, numberOfSlots, tmpDirPaths);
    }

    @VisibleForTesting
    static NettyShuffleEnvironment createNettyShuffleEnvironment(NettyShuffleEnvironmentConfiguration config, ResourceID taskExecutorResourceId, TaskEventPublisher taskEventPublisher, ResultPartitionManager resultPartitionManager, MetricGroup metricGroup, Executor ioExecutor, int numberOfSlots, String[] tmpDirPaths) {
        NettyConfig nettyConfig = config.nettyConfig();
        ConnectionManager connectionManager = nettyConfig != null ? new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig, config.getMaxNumberOfConnections(), config.isConnectionReuseEnabled()) : new LocalConnectionManager();
        return NettyShuffleServiceFactory.createNettyShuffleEnvironment(config, taskExecutorResourceId, taskEventPublisher, resultPartitionManager, connectionManager, metricGroup, ioExecutor, numberOfSlots, tmpDirPaths);
    }

    @VisibleForTesting
    public static NettyShuffleEnvironment createNettyShuffleEnvironment(NettyShuffleEnvironmentConfiguration config, ResourceID taskExecutorResourceId, TaskEventPublisher taskEventPublisher, ResultPartitionManager resultPartitionManager, ConnectionManager connectionManager, MetricGroup metricGroup, Executor ioExecutor, int numberOfSlots, String[] tmpDirPaths) {
        Preconditions.checkNotNull((Object)config);
        Preconditions.checkNotNull((Object)taskExecutorResourceId);
        Preconditions.checkNotNull((Object)taskEventPublisher);
        Preconditions.checkNotNull((Object)resultPartitionManager);
        Preconditions.checkNotNull((Object)metricGroup);
        Preconditions.checkNotNull((Object)connectionManager);
        FileChannelManagerImpl fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
        if (LOG.isInfoEnabled()) {
            LOG.info("Created a new {} for storing result partitions of BLOCKING shuffles. Used directories:\n\t{}", (Object)FileChannelManager.class.getSimpleName(), (Object)Arrays.stream(fileChannelManager.getPaths()).map(File::getAbsolutePath).collect(Collectors.joining("\n\t")));
        }
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize(), config.getRequestSegmentsTimeout());
        BatchShuffleReadBufferPool batchShuffleReadBufferPool = new BatchShuffleReadBufferPool(config.batchShuffleReadMemoryBytes(), config.networkBufferSize());
        ScheduledExecutorService batchShuffleReadIOExecutor = Executors.newScheduledThreadPool(Math.max(1, Math.min(batchShuffleReadBufferPool.getMaxConcurrentRequests(), Math.max(numberOfSlots, tmpDirPaths.length))), (ThreadFactory)new ExecutorThreadFactory("blocking-shuffle-io"));
        NettyShuffleMetricFactory.registerShuffleMetrics(metricGroup, networkBufferPool);
        Optional<TieredResultPartitionFactory> tieredResultPartitionFactory = Optional.empty();
        TieredStorageConfiguration tieredStorageConfiguration = config.getTieredStorageConfiguration();
        TieredStorageNettyServiceImpl tieredStorageNettyService = null;
        if (tieredStorageConfiguration != null) {
            TieredStorageResourceRegistry tieredStorageResourceRegistry = new TieredStorageResourceRegistry();
            tieredStorageNettyService = new TieredStorageNettyServiceImpl(tieredStorageResourceRegistry);
            tieredResultPartitionFactory = Optional.of(new TieredResultPartitionFactory(tieredStorageConfiguration, tieredStorageNettyService, tieredStorageResourceRegistry));
        }
        ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(resultPartitionManager, fileChannelManager, networkBufferPool, batchShuffleReadBufferPool, batchShuffleReadIOExecutor, config.getBlockingSubpartitionType(), config.networkBuffersPerChannel(), config.floatingNetworkBuffersPerGate(), config.networkBufferSize(), config.isBatchShuffleCompressionEnabled(), config.getCompressionCodec(), config.getMaxBuffersPerChannel(), config.sortShuffleMinBuffers(), config.sortShuffleMinParallelism(), config.isSSLEnabled(), config.getMaxOverdraftBuffersPerGate(), config.getHybridShuffleSpilledIndexRegionGroupSize(), config.getHybridShuffleNumRetainedInMemoryRegionsMax(), tieredResultPartitionFactory);
        SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(taskExecutorResourceId, config, connectionManager, resultPartitionManager, taskEventPublisher, networkBufferPool, tieredStorageConfiguration, tieredStorageNettyService);
        return new NettyShuffleEnvironment(taskExecutorResourceId, config, networkBufferPool, connectionManager, resultPartitionManager, fileChannelManager, resultPartitionFactory, singleInputGateFactory, ioExecutor, batchShuffleReadBufferPool, batchShuffleReadIOExecutor);
    }
}

