package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.ChannelBackendMutableObjectIterator;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/sort/RecordComparisonMerger.class */
public class RecordComparisonMerger<T> implements SortedDataFileMerger<T> {
    private static final Logger LOG = LoggerFactory.getLogger(RecordComparisonMerger.class);
    private List<SortedDataFile<T>> sortedDataFiles = new ArrayList();
    protected final SortedDataFileFactory<T> sortedDataFileFactory;
    protected final IOManager ioManager;
    protected final TypeSerializer<T> typeSerializer;
    protected final TypeComparator<T> typeComparator;
    private final int maxFileHandlesPerMerge;
    protected final boolean objectReuseEnabled;

    public RecordComparisonMerger(SortedDataFileFactory<T> sortedDataFileFactory, IOManager iOManager, TypeSerializer<T> typeSerializer, TypeComparator<T> typeComparator, int i, boolean z) {
        this.sortedDataFileFactory = sortedDataFileFactory;
        this.ioManager = iOManager;
        this.typeSerializer = typeSerializer;
        this.typeComparator = typeComparator;
        this.maxFileHandlesPerMerge = i;
        this.objectReuseEnabled = z;
    }

    private void merge(List<MemorySegment> list, List<MemorySegment> list2, ChannelDeleteRegistry<T> channelDeleteRegistry, AtomicBoolean atomicBoolean) throws IOException {
        int min = Math.min(this.maxFileHandlesPerMerge, list2.size());
        while (atomicBoolean.get() && this.sortedDataFiles.size() > min) {
            this.sortedDataFiles = mergeChannelList(this.sortedDataFiles, list2, list, channelDeleteRegistry, min);
        }
    }

    @Override // org.apache.flink.runtime.operators.sort.SortedDataFileMerger
    public MutableObjectIterator<T> getMergingIterator(List<SortedDataFile<T>> list, List<MemorySegment> list2, MutableObjectIterator<T> mutableObjectIterator, ChannelDeleteRegistry<T> channelDeleteRegistry) throws IOException {
        return getMergingIteratorWithSegmentedMemory(list, distributeReadMemory(list2, list.size()), null, mutableObjectIterator, channelDeleteRegistry);
    }

    @Override // org.apache.flink.runtime.operators.sort.SortedDataFileMerger
    public void notifyNewSortedDataFile(SortedDataFile<T> sortedDataFile, List<MemorySegment> list, List<MemorySegment> list2, ChannelDeleteRegistry<T> channelDeleteRegistry, AtomicBoolean atomicBoolean) throws IOException {
        this.sortedDataFiles.add(sortedDataFile);
    }

    @Override // org.apache.flink.runtime.operators.sort.SortedDataFileMerger
    public List<SortedDataFile<T>> finishMerging(List<MemorySegment> list, List<MemorySegment> list2, ChannelDeleteRegistry<T> channelDeleteRegistry, AtomicBoolean atomicBoolean) throws IOException {
        merge(list, list2, channelDeleteRegistry, atomicBoolean);
        return this.sortedDataFiles;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final MergeIterator<T> getMergingIteratorWithSegmentedMemory(List<SortedDataFile<T>> list, List<List<MemorySegment>> list2, List<FileIOChannel> list3, MutableObjectIterator<T> mutableObjectIterator, ChannelDeleteRegistry<T> channelDeleteRegistry) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Performing merge of " + list.size() + " sorted streams.");
        }
        ArrayList arrayList = new ArrayList(list.size() + 1);
        for (int i = 0; i < list.size(); i++) {
            ChannelBackendMutableObjectIterator<T> createReader = list.get(i).createReader(list2.get(i));
            if (list3 != null) {
                list3.add(createReader.getReaderChannel());
            }
            channelDeleteRegistry.registerOpenChannel(createReader.getReaderChannel());
            channelDeleteRegistry.registerChannelToBeDelete(createReader.getReaderChannel().getChannelID());
            arrayList.add(createReader);
        }
        if (mutableObjectIterator != null) {
            arrayList.add(mutableObjectIterator);
        }
        return new MergeIterator<>(arrayList, this.typeComparator);
    }

    protected final List<SortedDataFile<T>> mergeChannelList(List<SortedDataFile<T>> list, List<MemorySegment> list2, List<MemorySegment> list3, ChannelDeleteRegistry<T> channelDeleteRegistry, int i) throws IOException {
        double ceil = Math.ceil(Math.log(list.size()) / Math.log(i)) - 1.0d;
        int size = list.size();
        int pow = (int) Math.pow(i, ceil);
        int ceil2 = (int) Math.ceil((size - pow) / (i - 1));
        int i2 = pow - ceil2;
        int i3 = size - i2;
        ArrayList arrayList = new ArrayList(pow);
        arrayList.addAll(list.subList(0, i2));
        int ceil3 = (int) Math.ceil(i3 / ceil2);
        List<List<MemorySegment>> distributeReadMemory = distributeReadMemory(list2, ceil3);
        ArrayList arrayList2 = new ArrayList(ceil3);
        int i4 = i2;
        while (i4 < list.size()) {
            arrayList2.clear();
            int i5 = 0;
            while (i5 < ceil3 && i4 < list.size()) {
                arrayList2.add(list.get(i4));
                i5++;
                i4++;
            }
            arrayList.add(mergeToNewFile(arrayList2, distributeReadMemory, list3, channelDeleteRegistry));
        }
        return arrayList;
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected SortedDataFile<T> mergeToNewFile(List<SortedDataFile<T>> list, List<List<MemorySegment>> list2, List<MemorySegment> list3, ChannelDeleteRegistry<T> channelDeleteRegistry) throws IOException {
        ArrayList arrayList = new ArrayList(list.size());
        MergeIterator<T> mergingIteratorWithSegmentedMemory = getMergingIteratorWithSegmentedMemory(list, list2, arrayList, null, channelDeleteRegistry);
        SortedDataFile<T> createFile = this.sortedDataFileFactory.createFile(list3);
        channelDeleteRegistry.registerChannelToBeDelete(createFile.getChannelID());
        channelDeleteRegistry.registerOpenChannel(createFile.getWriteChannel());
        if (!this.objectReuseEnabled) {
            while (true) {
                T next = mergingIteratorWithSegmentedMemory.next();
                if (next == null) {
                    break;
                }
                createFile.writeRecord(next);
            }
        } else {
            T createInstance = this.typeSerializer.createInstance();
            while (true) {
                T next2 = mergingIteratorWithSegmentedMemory.next(createInstance);
                createInstance = next2;
                if (next2 == null) {
                    break;
                }
                createFile.writeRecord(createInstance);
            }
        }
        createFile.finishWriting();
        channelDeleteRegistry.unregisterOpenChannel(createFile.getWriteChannel());
        for (int i = 0; i < arrayList.size(); i++) {
            FileIOChannel fileIOChannel = arrayList.get(i);
            fileIOChannel.closeAndDelete();
            channelDeleteRegistry.unregisterOpenChannel(fileIOChannel);
        }
        return createFile;
    }

    protected final List<List<MemorySegment>> distributeReadMemory(List<MemorySegment> list, int i) {
        ArrayList arrayList = new ArrayList(i);
        int size = list.size();
        int i2 = size / i;
        int i3 = size % i;
        Iterator<MemorySegment> it = list.iterator();
        int i4 = 0;
        while (i4 < i) {
            int i5 = i4 < i3 ? i2 + 1 : i2;
            ArrayList arrayList2 = new ArrayList(i5);
            arrayList.add(arrayList2);
            for (int i6 = 0; i6 < i5; i6++) {
                arrayList2.add(it.next());
            }
            i4++;
        }
        return arrayList;
    }
}
