package org.apache.flink.runtime.io.network.partition.external.writer;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.FixedLengthBufferPool;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleUtils;
import org.apache.flink.runtime.io.network.partition.external.PartitionIndex;
import org.apache.flink.runtime.io.network.partition.external.PersistentFileType;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/writer/PartitionHashFileWriter.class */
public class PartitionHashFileWriter<T> implements PersistentFileWriter<T> {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionHashFileWriter.class);
    private final int numPartitions;
    private final MemoryManager memoryManager;
    private final List<MemorySegment> memory;
    private final RecordSerializer<IOReadableWritable> recordSerializer;
    private final SerializationDelegate<T> serializationDelegate;
    private final FixedLengthBufferPool bufferPool;
    private final BufferFileWriter[] fileWriters;
    private final BufferBuilder[] currentBufferBuilders;
    private final long[] bytesWritten;
    private final Counter numBytesOut;
    private final Counter numBuffersOut;

    public PartitionHashFileWriter(int i, String str, MemoryManager memoryManager, List<MemorySegment> list, IOManager iOManager, TypeSerializer<T> typeSerializer) throws IOException {
        this(i, str, memoryManager, list, iOManager, typeSerializer, null, null);
    }

    public PartitionHashFileWriter(int i, String str, MemoryManager memoryManager, List<MemorySegment> list, IOManager iOManager, TypeSerializer<T> typeSerializer, Counter counter, Counter counter2) throws IOException {
        Preconditions.checkArgument(i > 0, "The number of subpartitions should be larger than 0, but actually is: " + i);
        this.numPartitions = i;
        Preconditions.checkArgument(list.size() >= i, "The number of memory segments should be more than that of subpartitions, but actually numMemory: " + list.size() + ", numPartitions: " + i);
        this.memoryManager = (MemoryManager) Preconditions.checkNotNull(memoryManager);
        this.memory = list;
        this.recordSerializer = new SpanningRecordSerializer();
        this.serializationDelegate = new SerializationDelegate<>(typeSerializer);
        this.bufferPool = new FixedLengthBufferPool(list, false);
        this.fileWriters = new BufferFileWriter[i];
        this.currentBufferBuilders = new BufferBuilder[i];
        this.bytesWritten = new long[i];
        for (int i2 = 0; i2 < i; i2++) {
            this.fileWriters[i2] = iOManager.createStreamFileWriter(iOManager.createChannel(new File(ExternalBlockShuffleUtils.generateDataPath(str, i2))));
            this.bytesWritten[i2] = 0;
        }
        this.numBytesOut = counter;
        this.numBuffersOut = counter2;
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public void add(T t, int i) throws IOException, InterruptedException {
        this.serializationDelegate.setInstance(t);
        this.recordSerializer.serializeRecord(this.serializationDelegate);
        copyToTargetFile(i);
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public void add(T t, int[] iArr) throws IOException, InterruptedException {
        this.serializationDelegate.setInstance(t);
        this.recordSerializer.serializeRecord(this.serializationDelegate);
        for (int i : iArr) {
            copyToTargetFile(i);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public void finish() throws IOException, InterruptedException {
        for (int i = 0; i < this.numPartitions; i++) {
            tryFinishCurrentBufferBuilder(i);
            this.fileWriters[i].close();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public List<List<PartitionIndex>> generatePartitionIndices() throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.numPartitions; i++) {
            arrayList.add(new PartitionIndex(i, 0L, this.bytesWritten[i]));
        }
        return Collections.singletonList(arrayList);
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public void clear() throws IOException {
        this.memoryManager.release(this.memory);
        this.bufferPool.lazyDestroy();
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public PersistentFileType getExternalFileType() {
        return PersistentFileType.HASH_PARTITION_FILE;
    }

    private void copyToTargetFile(int i) throws IOException, InterruptedException {
        this.recordSerializer.reset();
        RecordSerializer.SerializationResult copyToBufferBuilder = this.recordSerializer.copyToBufferBuilder(getCurrentBufferBuilder(i));
        while (true) {
            RecordSerializer.SerializationResult serializationResult = copyToBufferBuilder;
            if (!serializationResult.isFullBuffer()) {
                break;
            }
            tryFinishCurrentBufferBuilder(i);
            if (serializationResult.isFullRecord()) {
                break;
            } else {
                copyToBufferBuilder = this.recordSerializer.copyToBufferBuilder(getCurrentBufferBuilder(i));
            }
        }
        Preconditions.checkState(!this.recordSerializer.hasSerializedData(), "All data should be written at once");
    }

    private BufferBuilder getCurrentBufferBuilder(int i) throws InterruptedException {
        if (this.currentBufferBuilders[i] == null) {
            this.currentBufferBuilders[i] = this.bufferPool.requestBufferBuilderBlocking();
            Preconditions.checkState(this.currentBufferBuilders[i] != null, "Failed to request a buffer.");
        }
        return this.currentBufferBuilders[i];
    }

    private void tryFinishCurrentBufferBuilder(int i) throws IOException {
        if (this.currentBufferBuilders[i] != null) {
            this.currentBufferBuilders[i].finish();
            BufferConsumer createBufferConsumer = this.currentBufferBuilders[i].createBufferConsumer();
            Buffer build = createBufferConsumer.build();
            long[] jArr = this.bytesWritten;
            jArr[i] = jArr[i] + build.getSize();
            this.fileWriters[i].writeBlock(build);
            if (this.numBytesOut != null) {
                this.numBytesOut.inc(build.getSize());
            }
            if (this.numBuffersOut != null) {
                this.numBuffersOut.inc();
            }
            createBufferConsumer.close();
            this.currentBufferBuilders[i] = null;
        }
    }
}
