/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.io.disk.SpillingBuffer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.util.CloseableInputProvider;
import org.apache.flink.util.MutableObjectIterator;

public class TempBarrier<T>
implements CloseableInputProvider<T> {
    private final SpillingBuffer buffer;
    private final TypeSerializer<T> serializer;
    private final TempWritingThread tempWriter;
    private final MemoryManager memManager;
    private final Object lock = new Object();
    private volatile Throwable exception;
    private final ArrayList<MemorySegment> memory;
    private volatile boolean writingDone;
    private volatile boolean closed;

    public TempBarrier(AbstractInvokable owner, MutableObjectIterator<T> input, TypeSerializerFactory<T> serializerFactory, MemoryManager memManager, IOManager ioManager, int numPages, List<MemorySegment> preAllocated) throws MemoryAllocationException {
        this.serializer = serializerFactory.getSerializer();
        this.memManager = memManager;
        this.memory = new ArrayList(numPages);
        this.memory.addAll(preAllocated);
        if (numPages > this.memory.size()) {
            memManager.allocatePages(owner, this.memory, numPages - preAllocated.size());
        }
        this.buffer = new SpillingBuffer(ioManager, new ListMemorySegmentSource(this.memory), memManager.getPageSize());
        this.tempWriter = new TempWritingThread(input, serializerFactory.getSerializer(), this.buffer);
    }

    public void startReading() {
        this.tempWriter.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MutableObjectIterator<T> getIterator() throws InterruptedException, IOException {
        Object object = this.lock;
        synchronized (object) {
            while (this.exception == null && !this.writingDone) {
                this.lock.wait(5000L);
            }
        }
        if (this.exception != null) {
            throw new RuntimeException("An error occurred creating the temp table.", this.exception);
        }
        if (this.writingDone) {
            DataInputView in = this.buffer.flip();
            return new InputViewIterator<T>(in, this.serializer);
        }
        return null;
    }

    @Override
    public void close() throws IOException {
        this.memManager.release(this.prepareToClose());
    }

    List<MemorySegment> closeAndGetLeftoverMemory() throws IOException {
        return this.prepareToClose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<MemorySegment> prepareToClose() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                return Collections.emptyList();
            }
            if (this.exception == null) {
                this.exception = new Exception("The dam has been closed.");
            }
            this.lock.notifyAll();
        }
        try {
            this.tempWriter.shutdown();
            this.tempWriter.join();
        }
        catch (InterruptedException iex) {
            throw new IOException("Interrupted");
        }
        ArrayList<MemorySegment> toRelease = new ArrayList<MemorySegment>();
        toRelease.addAll(this.buffer.close());
        toRelease.addAll(this.memory);
        this.memory.clear();
        return toRelease;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setException(Throwable t) {
        Object object = this.lock;
        synchronized (object) {
            this.exception = t;
            this.lock.notifyAll();
        }
        try {
            this.close();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writingDone() {
        Object object = this.lock;
        synchronized (object) {
            this.writingDone = true;
            this.lock.notifyAll();
        }
    }

    private final class TempWritingThread
    extends Thread {
        private final MutableObjectIterator<T> input;
        private final TypeSerializer<T> serializer;
        private final SpillingBuffer buffer;
        private volatile boolean running;

        private TempWritingThread(MutableObjectIterator<T> input, TypeSerializer<T> serializer, SpillingBuffer buffer) {
            super("Temp writer");
            this.running = true;
            this.setDaemon(true);
            this.input = input;
            this.serializer = serializer;
            this.buffer = buffer;
        }

        @Override
        public void run() {
            MutableObjectIterator input = this.input;
            TypeSerializer serializer = this.serializer;
            SpillingBuffer buffer = this.buffer;
            try {
                Object record = serializer.createInstance();
                while (this.running && (record = input.next(record)) != null) {
                    serializer.serialize(record, (DataOutputView)buffer);
                }
                TempBarrier.this.writingDone();
            }
            catch (Throwable t) {
                TempBarrier.this.setException(t);
            }
        }

        public void shutdown() {
            this.running = false;
            this.interrupt();
        }
    }
}

