/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsDataView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionConsumerInternalOperations;
import org.apache.flink.util.Preconditions;

public class HsSubpartitionConsumer
implements ResultSubpartitionView,
HsSubpartitionConsumerInternalOperations {
    private final BufferAvailabilityListener availabilityListener;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private int lastConsumedBufferIndex = -1;
    @GuardedBy(value="lock")
    private boolean needNotify = true;
    @Nullable
    @GuardedBy(value="lock")
    private Buffer.DataType cachedNextDataType = null;
    @Nullable
    @GuardedBy(value="lock")
    private Throwable failureCause = null;
    @GuardedBy(value="lock")
    private boolean isReleased = false;
    @Nullable
    @GuardedBy(value="lock")
    private HsDataView diskDataView;
    @Nullable
    @GuardedBy(value="lock")
    private HsDataView memoryDataView;

    public HsSubpartitionConsumer(BufferAvailabilityListener availabilityListener) {
        this.availabilityListener = availabilityListener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Nullable
    public ResultSubpartition.BufferAndBacklog getNextBuffer() {
        try {
            Object object = this.lock;
            synchronized (object) {
                Preconditions.checkNotNull((Object)this.diskDataView, (String)"disk data view must be not null.");
                Preconditions.checkNotNull((Object)this.memoryDataView, (String)"memory data view must be not null.");
                Optional<ResultSubpartition.BufferAndBacklog> bufferToConsume = this.tryReadFromDisk();
                if (!bufferToConsume.isPresent()) {
                    bufferToConsume = this.memoryDataView.consumeBuffer(this.lastConsumedBufferIndex + 1);
                }
                this.updateConsumingStatus(bufferToConsume);
                return bufferToConsume.map(this::handleBacklog).orElse(null);
            }
        }
        catch (Throwable cause) {
            this.releaseInternal(cause);
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyDataAvailable() {
        boolean notifyDownStream = false;
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased) {
                return;
            }
            if (this.needNotify) {
                notifyDownStream = true;
                this.needNotify = false;
            }
        }
        if (notifyDownStream) {
            this.availabilityListener.notifyDataAvailable();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ResultSubpartitionView.AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable) {
        Object object = this.lock;
        synchronized (object) {
            int backlog;
            boolean availability;
            boolean bl = availability = numCreditsAvailable > 0;
            if (numCreditsAvailable <= 0 && this.cachedNextDataType != null && this.cachedNextDataType == Buffer.DataType.EVENT_BUFFER) {
                availability = true;
            }
            if ((backlog = this.getSubpartitionBacklog()) == 0) {
                this.needNotify = true;
            }
            return new ResultSubpartitionView.AvailabilityWithBacklog(availability, backlog);
        }
    }

    @Override
    public void releaseAllResources() throws IOException {
        this.releaseInternal(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isReleased() {
        Object object = this.lock;
        synchronized (object) {
            return this.isReleased;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getConsumingOffset(boolean withLock) {
        if (!withLock) {
            return this.lastConsumedBufferIndex;
        }
        Object object = this.lock;
        synchronized (object) {
            return this.lastConsumedBufferIndex;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Throwable getFailureCause() {
        Object object = this.lock;
        synchronized (object) {
            return this.failureCause;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void setDiskDataView(HsDataView diskDataView) {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((this.diskDataView == null ? 1 : 0) != 0, (Object)"repeatedly set disk data view is not allowed.");
            this.diskDataView = diskDataView;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void setMemoryDataView(HsDataView memoryDataView) {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((this.memoryDataView == null ? 1 : 0) != 0, (Object)"repeatedly set memory data view is not allowed.");
            this.memoryDataView = memoryDataView;
        }
    }

    @Override
    public void resumeConsumption() {
        throw new UnsupportedOperationException("resumeConsumption should never be called.");
    }

    @Override
    public void acknowledgeAllDataProcessed() {
    }

    @Override
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return this.getSubpartitionBacklog();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getNumberOfQueuedBuffers() {
        Object object = this.lock;
        synchronized (object) {
            return this.getSubpartitionBacklog();
        }
    }

    @Override
    public void notifyNewBufferSize(int newBufferSize) {
        throw new UnsupportedOperationException("Method should never be called.");
    }

    private int getSubpartitionBacklog() {
        if (this.memoryDataView == null || this.diskDataView == null) {
            return 0;
        }
        return Math.max(this.memoryDataView.getBacklog(), this.diskDataView.getBacklog());
    }

    private ResultSubpartition.BufferAndBacklog handleBacklog(ResultSubpartition.BufferAndBacklog bufferToConsume) {
        return bufferToConsume.buffersInBacklog() == 0 ? new ResultSubpartition.BufferAndBacklog(bufferToConsume.buffer(), this.getSubpartitionBacklog(), bufferToConsume.getNextDataType(), bufferToConsume.getSequenceNumber()) : bufferToConsume;
    }

    @GuardedBy(value="lock")
    private Optional<ResultSubpartition.BufferAndBacklog> tryReadFromDisk() throws Throwable {
        int nextBufferIndexToConsume = this.lastConsumedBufferIndex + 1;
        return ((HsDataView)Preconditions.checkNotNull((Object)this.diskDataView)).consumeBuffer(nextBufferIndexToConsume).map(bufferAndBacklog -> {
            if (bufferAndBacklog.getNextDataType() == Buffer.DataType.NONE) {
                return new ResultSubpartition.BufferAndBacklog(bufferAndBacklog.buffer(), bufferAndBacklog.buffersInBacklog(), ((HsDataView)Preconditions.checkNotNull((Object)this.memoryDataView)).peekNextToConsumeDataType(nextBufferIndexToConsume + 1), bufferAndBacklog.getSequenceNumber());
            }
            return bufferAndBacklog;
        });
    }

    @GuardedBy(value="lock")
    private void updateConsumingStatus(Optional<ResultSubpartition.BufferAndBacklog> bufferAndBacklog) {
        boolean dataAvailable;
        assert (Thread.holdsLock(this.lock));
        if (bufferAndBacklog.isPresent()) {
            ++this.lastConsumedBufferIndex;
            Preconditions.checkState((bufferAndBacklog.get().getSequenceNumber() == this.lastConsumedBufferIndex ? 1 : 0) != 0);
        }
        this.needNotify = !(dataAvailable = bufferAndBacklog.map(ResultSubpartition.BufferAndBacklog::isDataAvailable).orElse(false).booleanValue());
        this.cachedNextDataType = bufferAndBacklog.map(ResultSubpartition.BufferAndBacklog::getNextDataType).orElse(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseInternal(@Nullable Throwable throwable) {
        boolean releaseMemoryView;
        boolean releaseDiskView;
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased) {
                return;
            }
            this.isReleased = true;
            this.failureCause = throwable;
            releaseDiskView = this.diskDataView != null;
            releaseMemoryView = this.memoryDataView != null;
        }
        if (releaseDiskView) {
            this.diskDataView.releaseDataView();
        }
        if (releaseMemoryView) {
            this.memoryDataView.releaseDataView();
        }
    }
}

