/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

public class RocksDBIncrementalCheckpointUtils {
    private static Score stateHandleEvaluator(KeyedStateHandle stateHandle, KeyGroupRange targetKeyGroupRange, double overlapFractionThreshold) {
        KeyGroupRange handleKeyGroupRange = stateHandle.getKeyGroupRange();
        KeyGroupRange intersectGroup = handleKeyGroupRange.getIntersection(targetKeyGroupRange);
        double overlapFraction = (double)intersectGroup.getNumberOfKeyGroups() / (double)handleKeyGroupRange.getNumberOfKeyGroups();
        if (overlapFraction < overlapFractionThreshold) {
            return Score.MIN;
        }
        return new Score(intersectGroup.getNumberOfKeyGroups(), overlapFraction);
    }

    public static void clipDBWithKeyGroupRange(@Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull KeyGroupRange targetKeyGroupRange, @Nonnull KeyGroupRange currentKeyGroupRange, @Nonnegative int keyGroupPrefixBytes) throws RocksDBException {
        byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
        byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
        if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
            CompositeKeySerializationUtils.serializeKeyGroup((int)currentKeyGroupRange.getStartKeyGroup(), (byte[])beginKeyGroupBytes);
            CompositeKeySerializationUtils.serializeKeyGroup((int)targetKeyGroupRange.getStartKeyGroup(), (byte[])endKeyGroupBytes);
            RocksDBIncrementalCheckpointUtils.deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
        }
        if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
            CompositeKeySerializationUtils.serializeKeyGroup((int)(targetKeyGroupRange.getEndKeyGroup() + 1), (byte[])beginKeyGroupBytes);
            CompositeKeySerializationUtils.serializeKeyGroup((int)(currentKeyGroupRange.getEndKeyGroup() + 1), (byte[])endKeyGroupBytes);
            RocksDBIncrementalCheckpointUtils.deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
        }
    }

    private static void deleteRange(RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles, byte[] beginKeyBytes, byte[] endKeyBytes) throws RocksDBException {
        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
            db.deleteRange(columnFamilyHandle, beginKeyBytes, endKeyBytes);
        }
    }

    public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull byte[] prefixBytes) {
        int prefixLength = prefixBytes.length;
        for (int i = 0; i < prefixLength; ++i) {
            int r = (char)prefixBytes[i] - (char)bytes[i];
            if (r == 0) continue;
            return r > 0;
        }
        return false;
    }

    @Nullable
    public static KeyedStateHandle chooseTheBestStateHandleForInitial(@Nonnull Collection<KeyedStateHandle> restoreStateHandles, @Nonnull KeyGroupRange targetKeyGroupRange, double overlapFractionThreshold) {
        KeyedStateHandle bestStateHandle = null;
        Score bestScore = Score.MIN;
        for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
            Score handleScore = RocksDBIncrementalCheckpointUtils.stateHandleEvaluator(rawStateHandle, targetKeyGroupRange, overlapFractionThreshold);
            if (handleScore.compareTo(bestScore) <= 0) continue;
            bestStateHandle = rawStateHandle;
            bestScore = handleScore;
        }
        return bestStateHandle;
    }

    private static class Score
    implements Comparable<Score> {
        public static final Score MIN = new Score(Integer.MIN_VALUE, -1.0);
        private final int intersectGroupRange;
        private final double overlapFraction;

        public Score(int intersectGroupRange, double overlapFraction) {
            this.intersectGroupRange = intersectGroupRange;
            this.overlapFraction = overlapFraction;
        }

        public int getIntersectGroupRange() {
            return this.intersectGroupRange;
        }

        public double getOverlapFraction() {
            return this.overlapFraction;
        }

        @Override
        public int compareTo(@Nonnull Score other) {
            return Comparator.comparing(Score::getIntersectGroupRange).thenComparing(Score::getOverlapFraction).compare(this, other);
        }
    }
}

