package org.apache.flink.runtime.io.network.partition.external;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.FixedLengthBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockResultPartitionMeta;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/ExternalBlockSubpartitionView.class */
public class ExternalBlockSubpartitionView implements ResultSubpartitionView, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalBlockSubpartitionView.class);
    private final ExternalBlockResultPartitionMeta externalResultPartitionMeta;
    private final int subpartitionIndex;
    private final ExecutorService threadPool;
    private final ResultPartitionID resultPartitionId;
    private final FixedLengthBufferPool bufferPool;
    private final long waitCreditTimeoutInMills;
    private long totalLength;

    @GuardedBy("lock")
    private volatile Throwable cause;
    private Iterator<ExternalBlockResultPartitionMeta.ExternalSubpartitionMeta> metaIterator;
    private final BufferAvailabilityListener listener;

    @GuardedBy("lock")
    private volatile boolean isReleased;

    @GuardedBy("lock")
    private boolean isRunning;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final ArrayDeque<Buffer> buffers = new ArrayDeque<>();
    private long totalReadLength = 0;
    private SynchronousBufferFileReader currFsIn = null;
    private long currRemainLength = 0;

    @GuardedBy("lock")
    private volatile int currentCredit = 0;

    public ExternalBlockSubpartitionView(ExternalBlockResultPartitionMeta externalBlockResultPartitionMeta, int i, ExecutorService executorService, ResultPartitionID resultPartitionID, FixedLengthBufferPool fixedLengthBufferPool, long j, BufferAvailabilityListener bufferAvailabilityListener) {
        this.externalResultPartitionMeta = (ExternalBlockResultPartitionMeta) Preconditions.checkNotNull(externalBlockResultPartitionMeta);
        this.subpartitionIndex = i;
        this.threadPool = (ExecutorService) Preconditions.checkNotNull(executorService);
        this.resultPartitionId = (ResultPartitionID) Preconditions.checkNotNull(resultPartitionID);
        this.bufferPool = (FixedLengthBufferPool) Preconditions.checkNotNull(fixedLengthBufferPool);
        this.waitCreditTimeoutInMills = j;
        this.listener = (BufferAvailabilityListener) Preconditions.checkNotNull(bufferAvailabilityListener);
    }

    /* JADX WARN: Code restructure failed: missing block: B:71:0x012a, code lost:
    
        r0 = r6.lock;
     */
    /* JADX WARN: Code restructure failed: missing block: B:72:0x012f, code lost:
    
        monitor-enter(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:75:0x0134, code lost:
    
        if (r6.isReleased == false) goto L99;
     */
    /* JADX WARN: Code restructure failed: missing block: B:76:0x0137, code lost:
    
        closeCurrentFileReader();
     */
    /* JADX WARN: Code restructure failed: missing block: B:77:0x013b, code lost:
    
        r6.isRunning = false;
     */
    /* JADX WARN: Code restructure failed: missing block: B:78:0x0144, code lost:
    
        if (isAvailableForReadUnsafe() == false) goto L102;
     */
    /* JADX WARN: Code restructure failed: missing block: B:79:0x0147, code lost:
    
        r6.threadPool.submit(r6);
     */
    /* JADX WARN: Code restructure failed: missing block: B:81:0x0153, code lost:
    
        monitor-exit(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:83:0x01fc, code lost:
    
        return;
     */
    @Override // java.lang.Runnable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            Method dump skipped, instructions count: 509
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.io.network.partition.external.ExternalBlockSubpartitionView.run():void");
    }

    private void initializeMeta() throws IOException {
        if (!this.externalResultPartitionMeta.hasInitialized()) {
            this.externalResultPartitionMeta.initialize();
        }
        List<ExternalBlockResultPartitionMeta.ExternalSubpartitionMeta> subpartitionMeta = this.externalResultPartitionMeta.getSubpartitionMeta(this.subpartitionIndex);
        this.metaIterator = subpartitionMeta.iterator();
        Iterator<ExternalBlockResultPartitionMeta.ExternalSubpartitionMeta> it = subpartitionMeta.iterator();
        while (it.hasNext()) {
            this.totalLength += it.next().getLength();
        }
    }

    private boolean isAvailableForReadUnsafe() {
        return hasMoreDataToReadUnsafe() && this.currentCredit > 0;
    }

    private boolean hasMoreDataToReadUnsafe() {
        return !this.isReleased && this.totalReadLength < this.totalLength;
    }

    @Nonnull
    private Buffer readNextBuffer() throws IOException, InterruptedException {
        if (this.currFsIn == null) {
            this.currFsIn = getNextFileReader();
        }
        Preconditions.checkState(this.currFsIn != null, "No more data to read.");
        Buffer requestBufferBlocking = this.bufferPool.requestBufferBlocking();
        Preconditions.checkState(requestBufferBlocking != null, "Failed to request a buffer.");
        Preconditions.checkState(this.currRemainLength > 0, "Should have data to read from the current file.");
        try {
            long min = Math.min(this.currRemainLength, requestBufferBlocking.getMaxCapacity());
            this.currFsIn.readInto(requestBufferBlocking, min);
            this.currRemainLength -= min;
            if (this.currRemainLength == 0) {
                closeCurrentFileReader();
            }
            return requestBufferBlocking;
        } catch (Throwable th) {
            if (!requestBufferBlocking.isRecycled()) {
                requestBufferBlocking.recycleBuffer();
            }
            throw th;
        }
    }

    private void closeCurrentFileReader() {
        if (this.currFsIn != null) {
            try {
                this.currFsIn.close();
            } catch (IOException e) {
                LOG.error("Ignore the close file exception.", e);
            }
            this.currFsIn = null;
        }
    }

    private SynchronousBufferFileReader getNextFileReader() throws IOException {
        SynchronousBufferFileReader synchronousBufferFileReader = null;
        while (true) {
            if (!this.metaIterator.hasNext()) {
                break;
            }
            ExternalBlockResultPartitionMeta.ExternalSubpartitionMeta next = this.metaIterator.next();
            this.currRemainLength = next.getLength();
            if (this.currRemainLength > 0) {
                synchronousBufferFileReader = new SynchronousBufferFileReader(new FileIOChannel.ID(next.getDataFile().getPath()), false, false);
                synchronousBufferFileReader.seekToPosition(next.getOffset());
                break;
            }
        }
        return synchronousBufferFileReader;
    }

    private void enqueueBuffer(Buffer buffer) throws IOException {
        synchronized (this.lock) {
            if (this.isReleased) {
                buffer.recycleBuffer();
                return;
            }
            this.buffers.add(buffer);
            if (buffer.isBuffer()) {
                this.currentCredit--;
                this.totalReadLength += buffer.getSize();
            }
            if (this.totalReadLength == this.totalLength && this.totalLength != 0) {
                this.buffers.add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));
            }
            this.listener.notifyDataAvailable();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public ResultSubpartition.BufferAndBacklog getNextBuffer() {
        synchronized (this.lock) {
            Buffer poll = this.buffers.poll();
            Buffer peek = this.buffers.peek();
            if (poll != null) {
                return new ResultSubpartition.BufferAndBacklog(poll, peek != null, this.buffers.size(), (peek == null || peek.isBuffer()) ? false : true);
            }
            return null;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean nextBufferIsEvent() {
        synchronized (this.lock) {
            if (this.cause == null) {
                return this.buffers.size() > 0 && !this.buffers.peek().isBuffer();
            }
            Preconditions.checkState(this.buffers.size() == 0, "All the buffer should be cleared after errors occur and released.");
            return true;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean isAvailable() {
        boolean z;
        synchronized (this.lock) {
            z = this.buffers.size() > 0 || this.cause != null;
        }
        return z;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyCreditAdded(int i) {
        synchronized (this.lock) {
            int i2 = this.currentCredit;
            this.currentCredit += i;
            if (i2 == 0) {
                if (this.isRunning) {
                    this.lock.notifyAll();
                } else {
                    this.threadPool.submit(this);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getCreditUnsafe() {
        return this.currentCredit;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getResultPartitionDir() {
        return this.externalResultPartitionMeta.getResultPartitionDir();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getSubpartitionIndex() {
        return this.subpartitionIndex;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyDataAvailable() {
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void releaseAllResources() {
        releaseAllResources(null);
    }

    private void releaseAllResources(Throwable th) {
        synchronized (this.lock) {
            if (this.isReleased) {
                return;
            }
            if (th != null) {
                this.cause = th;
            }
            while (true) {
                Buffer poll = this.buffers.poll();
                if (poll == null) {
                    break;
                } else {
                    poll.recycleBuffer();
                }
            }
            if (!this.isRunning) {
                closeCurrentFileReader();
            }
            this.externalResultPartitionMeta.notifySubpartitionConsumed(this.subpartitionIndex);
            this.isReleased = true;
            this.lock.notifyAll();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifySubpartitionConsumed() {
        releaseAllResources();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean isReleased() {
        return this.isReleased;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public Throwable getFailureCause() {
        return this.cause;
    }

    public String toString() {
        Object[] objArr = new Object[1];
        objArr[0] = this.currFsIn == null ? null : this.currFsIn.getChannelID().getPath();
        return String.format("ExternalSubpartitionView [current read file path : %s]", objArr);
    }

    @VisibleForTesting
    long getTotalLength() {
        return this.totalLength;
    }

    @VisibleForTesting
    Iterator<ExternalBlockResultPartitionMeta.ExternalSubpartitionMeta> getMetaIterator() {
        return this.metaIterator;
    }

    @VisibleForTesting
    int getCurrentCredit() {
        int i;
        synchronized (this.lock) {
            i = this.currentCredit;
        }
        return i;
    }

    @VisibleForTesting
    public boolean isRunning() {
        boolean z;
        synchronized (this.lock) {
            z = this.isRunning;
        }
        return z;
    }
}
