/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.common.io;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.annotation.Public;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
public final class LocatableInputSplitAssigner
implements InputSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
    private final BlockingQueue<AssigningInputSplit> unassigned = new LinkedBlockingQueue<AssigningInputSplit>();
    private Map<String, BlockingQueue<AssigningInputSplit>> splitHostMap = new HashMap<String, BlockingQueue<AssigningInputSplit>>();
    private AtomicInteger localAssignments = new AtomicInteger();
    private AtomicInteger remoteAssignments = new AtomicInteger();

    public LocatableInputSplitAssigner(Collection<LocatableInputSplit> splits) {
        this(splits.toArray(new LocatableInputSplit[splits.size()]));
    }

    public LocatableInputSplitAssigner(LocatableInputSplit[] splits) {
        int maxReplicaSize = 0;
        for (LocatableInputSplit locatableInputSplit : splits) {
            AssigningInputSplit split = new AssigningInputSplit(locatableInputSplit);
            this.unassigned.add(split);
            maxReplicaSize = maxReplicaSize > split.getHostNames().length ? maxReplicaSize : split.getHostNames().length;
        }
        for (int i = 0; i < maxReplicaSize; ++i) {
            for (AssigningInputSplit split : this.unassigned) {
                String hostName;
                if (split.getHostNames().length <= i || (hostName = split.getHostNames()[i]) == null) continue;
                String host = NetUtils.getHostnameFromFQDN(hostName.toLowerCase());
                this.splitHostMap.computeIfAbsent(host, k -> new LinkedBlockingQueue()).add(split);
            }
        }
    }

    @Override
    public LocatableInputSplit getNextInputSplit(String host, int taskId) {
        LocatableInputSplit split;
        BlockingQueue<AssigningInputSplit> inputSplitQueue;
        if (host != null && (inputSplitQueue = this.splitHostMap.get(host = host.toLowerCase(Locale.US))) != null && (split = this.takeUnAssignedSplit(inputSplitQueue)) != null) {
            this.localAssignments.incrementAndGet();
            LOG.info("Assigning local split to host " + host);
            return split;
        }
        LocatableInputSplit split2 = this.takeUnAssignedSplit(this.unassigned);
        if (split2 != null) {
            this.remoteAssignments.incrementAndGet();
            LOG.info("Assigning remote split to host " + host);
        }
        return split2;
    }

    @Override
    public void inputSplitsAssigned(int taskId, List<InputSplit> inputSplits) {
        for (InputSplit inputSplit : inputSplits) {
            boolean found = false;
            for (AssigningInputSplit split : this.unassigned) {
                if (!split.getSplit().equals(inputSplit)) continue;
                this.unassigned.remove(split);
                found = true;
                break;
            }
            if (found) continue;
            throw new FlinkRuntimeException("InputSplit not found for " + inputSplit.getSplitNumber());
        }
    }

    public int getNumberOfLocalAssignments() {
        return this.localAssignments.get();
    }

    public int getNumberOfRemoteAssignments() {
        return this.remoteAssignments.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private LocatableInputSplit takeUnAssignedSplit(BlockingQueue<AssigningInputSplit> queue) {
        AssigningInputSplit split = (AssigningInputSplit)queue.poll();
        while (split != null) {
            if (!split.isAssigned()) {
                AssigningInputSplit assigningInputSplit = split;
                synchronized (assigningInputSplit) {
                    if (!split.isAssigned()) {
                        split.setAssigned();
                        return split.getSplit();
                    }
                }
            }
            split = (AssigningInputSplit)queue.poll();
        }
        return null;
    }

    private static class AssigningInputSplit {
        private final LocatableInputSplit split;
        public volatile boolean isAssigned = false;

        public AssigningInputSplit(LocatableInputSplit split) {
            this.split = split;
        }

        public String[] getHostNames() {
            return this.split.getHostnames();
        }

        public LocatableInputSplit getSplit() {
            return this.split;
        }

        public boolean isAssigned() {
            return this.isAssigned;
        }

        public void setAssigned() {
            this.isAssigned = true;
        }
    }
}

