/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionSortedBuffer;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.PartitionedFileWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SortBuffer;
import org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

@NotThreadSafe
public class SortMergeResultPartition
extends ResultPartition {
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final Set<SortMergeSubpartitionReader> readers = new HashSet<SortMergeSubpartitionReader>();
    @GuardedBy(value="lock")
    private PartitionedFile resultFile;
    private final int[] numDataBuffers;
    private final MemorySegment writeBuffer;
    private final int networkBufferSize;
    private final PartitionedFileWriter fileWriter;
    private SortBuffer currentSortBuffer;

    public SortMergeResultPartition(String owningTaskName, int partitionIndex, ResultPartitionID partitionId, ResultPartitionType partitionType, int numSubpartitions, int numTargetKeyGroups, int networkBufferSize, ResultPartitionManager partitionManager, String resultFileBasePath, @Nullable BufferCompressor bufferCompressor, SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
        super(owningTaskName, partitionIndex, partitionId, partitionType, numSubpartitions, numTargetKeyGroups, partitionManager, bufferCompressor, bufferPoolFactory);
        this.networkBufferSize = networkBufferSize;
        this.numDataBuffers = new int[numSubpartitions];
        this.writeBuffer = MemorySegmentFactory.allocateUnpooledOffHeapMemory((int)networkBufferSize);
        PartitionedFileWriter fileWriter = null;
        try {
            fileWriter = new PartitionedFileWriter(numSubpartitions, 0x400000, resultFileBasePath);
        }
        catch (Throwable throwable) {
            ExceptionUtils.rethrow((Throwable)throwable);
        }
        this.fileWriter = fileWriter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void releaseInternal() {
        Object object = this.lock;
        synchronized (object) {
            if (this.resultFile == null) {
                this.fileWriter.releaseQuietly();
            }
            if (this.readers.isEmpty() && this.resultFile != null) {
                this.resultFile.deleteQuietly();
                this.resultFile = null;
            }
        }
    }

    @Override
    public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
        this.emit(record, targetSubpartition, Buffer.DataType.DATA_BUFFER);
    }

    @Override
    public void broadcastRecord(ByteBuffer record) throws IOException {
        this.broadcast(record, Buffer.DataType.DATA_BUFFER);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
        Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
        try {
            ByteBuffer serializedEvent = buffer.getNioBufferReadable();
            this.broadcast(serializedEvent, buffer.getDataType());
        }
        finally {
            buffer.recycleBuffer();
        }
    }

    private void broadcast(ByteBuffer record, Buffer.DataType dataType) throws IOException {
        for (int channelIndex = 0; channelIndex < this.numSubpartitions; ++channelIndex) {
            record.rewind();
            this.emit(record, channelIndex, dataType);
        }
    }

    private void emit(ByteBuffer record, int targetSubpartition, Buffer.DataType dataType) throws IOException {
        this.checkInProduceState();
        SortBuffer sortBuffer = this.getSortBuffer();
        if (sortBuffer.append(record, targetSubpartition, dataType)) {
            return;
        }
        if (!sortBuffer.hasRemaining()) {
            this.currentSortBuffer.finish();
            this.currentSortBuffer.release();
            this.writeLargeRecord(record, targetSubpartition, dataType);
            return;
        }
        this.flushCurrentSortBuffer();
        this.emit(record, targetSubpartition, dataType);
    }

    private void releaseCurrentSortBuffer() {
        if (this.currentSortBuffer != null) {
            this.currentSortBuffer.release();
        }
    }

    private SortBuffer getSortBuffer() {
        if (this.currentSortBuffer != null && !this.currentSortBuffer.isFinished()) {
            return this.currentSortBuffer;
        }
        this.currentSortBuffer = new PartitionSortedBuffer(this.lock, this.bufferPool, this.numSubpartitions, this.networkBufferSize, null);
        return this.currentSortBuffer;
    }

    private void flushCurrentSortBuffer() throws IOException {
        if (this.currentSortBuffer == null) {
            return;
        }
        this.currentSortBuffer.finish();
        if (this.currentSortBuffer.hasRemaining()) {
            this.fileWriter.startNewRegion();
            while (this.currentSortBuffer.hasRemaining()) {
                SortBuffer.BufferWithChannel bufferWithChannel = this.currentSortBuffer.copyIntoSegment(this.writeBuffer);
                Buffer buffer = bufferWithChannel.getBuffer();
                int subpartitionIndex = bufferWithChannel.getChannelIndex();
                this.writeCompressedBufferIfPossible(buffer, subpartitionIndex);
            }
        }
        this.currentSortBuffer.release();
    }

    private void writeCompressedBufferIfPossible(Buffer buffer, int targetSubpartition) throws IOException {
        this.updateStatistics(buffer, targetSubpartition);
        try {
            if (this.canBeCompressed(buffer)) {
                buffer = this.bufferCompressor.compressToIntermediateBuffer(buffer);
            }
            this.fileWriter.writeBuffer(buffer, targetSubpartition);
        }
        finally {
            buffer.recycleBuffer();
        }
    }

    private void updateStatistics(Buffer buffer, int subpartitionIndex) {
        this.numBuffersOut.inc();
        this.numBytesOut.inc((long)buffer.readableBytes());
        if (buffer.isBuffer()) {
            int n = subpartitionIndex;
            this.numDataBuffers[n] = this.numDataBuffers[n] + 1;
        }
    }

    private void writeLargeRecord(ByteBuffer record, int targetSubpartition, Buffer.DataType dataType) throws IOException {
        this.fileWriter.startNewRegion();
        while (record.hasRemaining()) {
            int toCopy = Math.min(record.remaining(), this.writeBuffer.size());
            this.writeBuffer.put(0, record, toCopy);
            NetworkBuffer buffer = new NetworkBuffer(this.writeBuffer, buf -> {}, dataType, toCopy);
            this.writeCompressedBufferIfPossible(buffer, targetSubpartition);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void releaseReader(SortMergeSubpartitionReader reader) {
        Object object = this.lock;
        synchronized (object) {
            this.readers.remove(reader);
            if (this.readers.isEmpty() && this.isReleased()) {
                this.releaseInternal();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void finish() throws IOException {
        this.broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
        this.flushCurrentSortBuffer();
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.isReleased() ? 1 : 0) != 0, (Object)"Result partition is already released.");
            this.resultFile = this.fileWriter.finish();
            LOG.info("New partitioned file produced: {}.", (Object)this.resultFile);
        }
        super.finish();
    }

    @Override
    public void close() {
        this.releaseCurrentSortBuffer();
        super.close();
        IOUtils.closeQuietly((AutoCloseable)this.fileWriter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ResultSubpartitionView createSubpartitionView(int subpartitionIndex, BufferAvailabilityListener availabilityListener) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkElementIndex((int)subpartitionIndex, (int)this.numSubpartitions, (String)"Subpartition not found.");
            Preconditions.checkState((!this.isReleased() ? 1 : 0) != 0, (Object)"Partition released.");
            Preconditions.checkState((boolean)this.isFinished(), (Object)"Trying to read unfinished blocking partition.");
            SortMergeSubpartitionReader reader = new SortMergeSubpartitionReader(subpartitionIndex, this.numDataBuffers[subpartitionIndex], this.networkBufferSize, this, availabilityListener, this.resultFile);
            this.readers.add(reader);
            return reader;
        }
    }

    @Override
    public void flushAll() {
        try {
            this.flushCurrentSortBuffer();
        }
        catch (IOException e) {
            LOG.error("Failed to flush the current sort buffer.", (Throwable)e);
        }
    }

    @Override
    public void flush(int subpartitionIndex) {
        try {
            this.flushCurrentSortBuffer();
        }
        catch (IOException e) {
            LOG.error("Failed to flush the current sort buffer.", (Throwable)e);
        }
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        return AVAILABLE;
    }

    @Override
    public int getNumberOfQueuedBuffers() {
        return 0;
    }

    @Override
    public int getNumberOfQueuedBuffers(int targetSubpartition) {
        return 0;
    }

    @VisibleForTesting
    PartitionedFile getResultFile() {
        return this.resultFile;
    }
}

