package org.apache.flink.runtime.io.network.partition.external;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.serialization.SerializerManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.external.writer.PartitionHashFileWriter;
import org.apache.flink.runtime.io.network.partition.external.writer.PartitionMergeFileWriter;
import org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/ExternalResultPartition.class */
public class ExternalResultPartition<T> extends ResultPartition<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalResultPartition.class);
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final String partitionRootPath;
    private final int hashMaxSubpartitions;
    private final int mergeFactor;
    private final boolean enableAsyncMerging;
    private final boolean mergeToOneFile;
    private final double shuffleMemory;
    private final int numPages;
    private final SerializerManager<SerializationDelegate<T>> serializerManager;
    private final long consumedPartitionTTL;
    private final long partialConsumedPartitionTTL;
    private final long unconsumedPartitionTTL;
    private final long unfinishedPartitionTTL;
    private PersistentFileWriter<T> fileWriter;
    private volatile boolean initialized;

    public ExternalResultPartition(Configuration configuration, String str, JobID jobID, ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, int i, int i2, MemoryManager memoryManager, IOManager iOManager) {
        super(str, jobID, resultPartitionID, resultPartitionType, i, i2);
        Preconditions.checkNotNull(configuration);
        this.memoryManager = (MemoryManager) Preconditions.checkNotNull(memoryManager);
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.partitionRootPath = ExternalBlockShuffleUtils.generatePartitionRootPath(getSpillRootPath(configuration, jobID.toString(), resultPartitionID.toString()), resultPartitionID.getProducerId().toString(), resultPartitionID.getPartitionId().toString());
        this.hashMaxSubpartitions = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_OUTPUT_HASH_MAX_SUBPARTITIONS);
        this.mergeFactor = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_OUTPUT_MERGE_FACTOR);
        this.enableAsyncMerging = configuration.getBoolean(TaskManagerOptions.TASK_MANAGER_OUTPUT_ENABLE_ASYNC_MERGE);
        this.mergeToOneFile = configuration.getBoolean(TaskManagerOptions.TASK_MANAGER_OUTPUT_MERGE_TO_ONE_FILE);
        this.shuffleMemory = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_OUTPUT_MEMORY_MB);
        this.numPages = (int) (((this.shuffleMemory * 1024.0d) * 1024.0d) / memoryManager.getPageSize());
        Preconditions.checkArgument(this.hashMaxSubpartitions > 0, "The max allowed number of subpartitions should be larger than 0, but actually is: " + this.hashMaxSubpartitions);
        Preconditions.checkArgument(this.mergeFactor > 0, "The merge factor should be larger than 0, but actually is: " + this.mergeFactor);
        Preconditions.checkArgument(this.shuffleMemory > 0.0d, "The shuffle memory should be larger than 0, but actually is: " + this.shuffleMemory);
        Preconditions.checkArgument(this.numPages > 0, "The number of pages should be larger than 0, but actually is: " + this.numPages);
        this.serializerManager = new SerializerManager<>(ResultPartitionType.BLOCKING, configuration);
        this.consumedPartitionTTL = configuration.getInteger(TaskManagerOptions.TASK_EXTERNAL_SHUFFLE_CONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        this.partialConsumedPartitionTTL = configuration.getInteger(TaskManagerOptions.TASK_EXTERNAL_SHUFFLE_PARTIAL_CONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        this.unconsumedPartitionTTL = configuration.getInteger(TaskManagerOptions.TASK_EXTERNAL_SHUFFLE_UNCONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        this.unfinishedPartitionTTL = configuration.getInteger(TaskManagerOptions.TASK_EXTERNAL_SHUFFLE_UNFINISHED_PARTITION_TTL_IN_SECONDS) * 1000;
    }

    private void initialize() {
        Preconditions.checkNotNull(this.typeSerializer);
        Preconditions.checkNotNull(this.parentTask);
        try {
            Path path = new Path(this.partitionRootPath);
            FileSystem localFileSystem = FileSystem.getLocalFileSystem();
            if (localFileSystem.exists(path)) {
                localFileSystem.delete(path, true);
            }
            int i = 100;
            do {
                try {
                    localFileSystem.mkdirs(path);
                } catch (IOException e) {
                    int i2 = i;
                    i--;
                    if (i2 <= 0) {
                        LOG.error("Reach retry limit, fail to create partition root path: " + this.partitionRootPath);
                        throw e;
                    }
                    LOG.error("Fail to create partition root path: " + this.partitionRootPath + ", left retry times: " + i);
                }
            } while (!localFileSystem.exists(path));
            writeConfigFile(localFileSystem);
            List<MemorySegment> allocatePages = this.memoryManager.allocatePages(this.parentTask, this.numPages);
            if (this.numberOfSubpartitions > this.hashMaxSubpartitions || this.numberOfSubpartitions > allocatePages.size() || this.serializerManager.useCompression()) {
                this.fileWriter = new PartitionMergeFileWriter(this.numberOfSubpartitions, this.partitionRootPath, this.mergeFactor, this.enableAsyncMerging, this.mergeToOneFile, this.memoryManager, allocatePages, this.ioManager, this.typeSerializer, this.serializerManager, this.parentTask, this.numBytesOut, this.numBuffersOut);
            } else {
                this.fileWriter = new PartitionHashFileWriter(this.numberOfSubpartitions, this.partitionRootPath, this.memoryManager, allocatePages, this.ioManager, this.typeSerializer, this.numBytesOut, this.numBuffersOut);
            }
            this.initialized = true;
            LOG.info(toString() + " initialized successfully.");
        } catch (Throwable th) {
            deletePartitionDirOnFailure();
            throw new RuntimeException(th);
        }
    }

    @VisibleForTesting
    void writeConfigFile(FileSystem fileSystem) throws IOException {
        String generateConfigPath = ExternalBlockShuffleUtils.generateConfigPath(this.partitionRootPath);
        try {
            FSDataOutputStream create = fileSystem.create(new Path(generateConfigPath), FileSystem.WriteMode.OVERWRITE);
            Throwable th = null;
            try {
                try {
                    DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(create);
                    dataOutputViewStreamWrapper.writeLong(this.consumedPartitionTTL);
                    dataOutputViewStreamWrapper.writeLong(this.partialConsumedPartitionTTL);
                    dataOutputViewStreamWrapper.writeLong(this.unconsumedPartitionTTL);
                    dataOutputViewStreamWrapper.writeLong(this.unfinishedPartitionTTL);
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.error("Write the config file " + generateConfigPath + " fail.", e);
            throw e;
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void emitRecord(T t, int[] iArr, boolean z, boolean z2) throws IOException, InterruptedException {
        if (!this.initialized) {
            initialize();
        }
        try {
            checkInProduceState();
            this.fileWriter.add((PersistentFileWriter<T>) t, iArr);
        } catch (Throwable th) {
            deletePartitionDirOnFailure();
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void emitRecord(T t, int i, boolean z, boolean z2) throws IOException, InterruptedException {
        if (!this.initialized) {
            initialize();
        }
        try {
            checkInProduceState();
            this.fileWriter.add((PersistentFileWriter<T>) t, i);
        } catch (Throwable th) {
            deletePartitionDirOnFailure();
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
        throw new UnsupportedOperationException("Event is not supported in external result partition.");
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void clearBuffers() {
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void flushAll() {
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void flush(int i) {
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    protected void releaseInternal() {
        try {
            if (this.fileWriter != null) {
                this.fileWriter.clear();
                this.fileWriter = null;
            }
        } catch (IOException e) {
            LOG.error("Fail to clear external shuffler", e);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public void finish() throws IOException {
        FSDataOutputStream create;
        try {
            try {
                if (!this.initialized) {
                    initialize();
                    LOG.warn("The result partition {} has no data before finish.", this.partitionId);
                }
            } catch (Throwable th) {
                deletePartitionDirOnFailure();
                ExceptionUtils.rethrow(th);
                releaseInternal();
            }
            if (this.isReleased.get()) {
                LOG.warn("The result partition {} has been released already before finish.", this.partitionId);
                deletePartitionDirOnFailure();
                releaseInternal();
                return;
            }
            checkInProduceState();
            if (!this.initialized) {
                initialize();
            }
            FileSystem fileSystem = FileSystem.get(new Path(this.partitionRootPath).toUri());
            this.fileWriter.finish();
            List<List<PartitionIndex>> generatePartitionIndices = this.fileWriter.generatePartitionIndices();
            for (int i = 0; i < generatePartitionIndices.size(); i++) {
                create = fileSystem.create(new Path(ExternalBlockShuffleUtils.generateIndexPath(this.partitionRootPath, i)), FileSystem.WriteMode.OVERWRITE);
                Throwable th2 = null;
                try {
                    try {
                        ExternalBlockShuffleUtils.serializeIndices(generatePartitionIndices.get(i), new DataOutputViewStreamWrapper(create));
                        if (create != null) {
                            if (0 != 0) {
                                try {
                                    create.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                create.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            }
            create = fileSystem.create(new Path(ExternalBlockShuffleUtils.generateFinishedPath(this.partitionRootPath)), FileSystem.WriteMode.OVERWRITE);
            Throwable th4 = null;
            try {
                try {
                    DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(create);
                    dataOutputViewStreamWrapper.writeInt(1);
                    String name = this.fileWriter.getExternalFileType().name();
                    dataOutputViewStreamWrapper.writeInt(name.length());
                    dataOutputViewStreamWrapper.write(name.getBytes());
                    dataOutputViewStreamWrapper.writeInt(generatePartitionIndices.size());
                    dataOutputViewStreamWrapper.writeInt(this.numberOfSubpartitions);
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th5) {
                                th4.addSuppressed(th5);
                            }
                        } else {
                            create.close();
                        }
                    }
                    releaseInternal();
                    this.isFinished = true;
                } finally {
                }
            } finally {
            }
        } catch (Throwable th6) {
            releaseInternal();
            throw th6;
        }
    }

    private void deletePartitionDirOnFailure() {
        boolean z = false;
        try {
            z = FileSystem.getLocalFileSystem().delete(new Path(this.partitionRootPath), true);
        } catch (Throwable th) {
            LOG.error("Exception occurred on deletePartitionDirOnFailure.", th);
        }
        if (z) {
            return;
        }
        LOG.error("Failed to delete dirty data, directory path " + this.partitionRootPath);
    }

    private String getSpillRootPath(Configuration configuration, String str, String str2) {
        String string = configuration.getString(TaskManagerOptions.TASK_MANAGER_OUTPUT_LOCAL_OUTPUT_DIRS);
        if (string.isEmpty()) {
            throw new IllegalStateException("The root dir for external result partition is not properly set. Please check " + ExternalBlockShuffleServiceOptions.LOCAL_DIRS + " in hadoop configuration.");
        }
        String[] split = string.split(",");
        Arrays.sort(split);
        return split[ExternalBlockShuffleUtils.hashPartitionToDisk(str, str2) % split.length];
    }

    @VisibleForTesting
    String getPartitionRootPath() {
        return this.partitionRootPath;
    }

    public String toString() {
        return "External Result Partition: {partitionId = " + this.partitionId + ", fileWriter = " + this.fileWriter.getClass().getName() + ", rootPath = " + this.partitionRootPath + ", numberOfSubpartitions = " + this.numberOfSubpartitions + ", hashMaxSubpartitions = " + this.hashMaxSubpartitions + ", mergeFactor = " + this.mergeFactor + ", shuffleMemory = " + this.shuffleMemory + ", numPages = " + this.numPages + ", enableAsyncMerging = " + this.enableAsyncMerging + ", mergeToOneFile = " + this.mergeToOneFile + ", consumedPartitionTTL" + this.consumedPartitionTTL + ", partialConsumedPartitionTTL" + this.partialConsumedPartitionTTL + ", unconsumedPartitionTTL" + this.unconsumedPartitionTTL + ", unfinishedPartitionTTL" + this.unfinishedPartitionTTL + "}";
    }
}
