/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state.restore;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBStateDownloader;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.contrib.streaming.state.restore.RocksDBHandle;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOperation {
    private static final Logger logger = LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class);
    private final String operatorIdentifier;
    private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
    private final RocksDBHandle rocksHandle;
    private final Collection<KeyedStateHandle> restoreStateHandles;
    private final CloseableRegistry cancelStreamRegistry;
    private final KeyGroupRange keyGroupRange;
    private final File instanceBasePath;
    private final int numberOfTransferringThreads;
    private final int keyGroupPrefixBytes;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final ClassLoader userCodeClassLoader;
    private long lastCompletedCheckpointId;
    private UUID backendUID;
    private final long writeBatchSize;
    private boolean isKeySerializerCompatibilityChecked;

    public RocksDBIncrementalRestoreOperation(String operatorIdentifier, KeyGroupRange keyGroupRange, int keyGroupPrefixBytes, int numberOfTransferringThreads, CloseableRegistry cancelStreamRegistry, ClassLoader userCodeClassLoader, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, StateSerializerProvider<K> keySerializerProvider, File instanceBasePath, File instanceRocksDBPath, DBOptions dbOptions, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, RocksDBNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> restoreStateHandles, @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, @Nonnegative long writeBatchSize, Long writeBufferManagerCapacity) {
        this.rocksHandle = new RocksDBHandle(kvStateInformation, instanceRocksDBPath, dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, ttlCompactFiltersManager, writeBufferManagerCapacity);
        this.operatorIdentifier = operatorIdentifier;
        this.restoredSstFiles = new TreeMap<Long, Set<StateHandleID>>();
        this.lastCompletedCheckpointId = -1L;
        this.backendUID = UUID.randomUUID();
        this.writeBatchSize = writeBatchSize;
        this.restoreStateHandles = restoreStateHandles;
        this.cancelStreamRegistry = cancelStreamRegistry;
        this.keyGroupRange = keyGroupRange;
        this.instanceBasePath = instanceBasePath;
        this.numberOfTransferringThreads = numberOfTransferringThreads;
        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
        this.keySerializerProvider = keySerializerProvider;
        this.userCodeClassLoader = userCodeClassLoader;
    }

    @Override
    public RocksDBRestoreResult restore() throws Exception {
        boolean isRescaling;
        if (this.restoreStateHandles == null || this.restoreStateHandles.isEmpty()) {
            return null;
        }
        KeyedStateHandle theFirstStateHandle = this.restoreStateHandles.iterator().next();
        boolean bl = isRescaling = this.restoreStateHandles.size() > 1 || !Objects.equals(theFirstStateHandle.getKeyGroupRange(), this.keyGroupRange);
        if (isRescaling) {
            this.restoreWithRescaling(this.restoreStateHandles);
        } else {
            this.restoreWithoutRescaling(theFirstStateHandle);
        }
        return new RocksDBRestoreResult(this.rocksHandle.getDb(), this.rocksHandle.getDefaultColumnFamilyHandle(), this.rocksHandle.getNativeMetricMonitor(), this.lastCompletedCheckpointId, this.backendUID, this.restoredSstFiles);
    }

    private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
        logger.info("Starting to restore from state handle: {} without rescaling.", (Object)keyedStateHandle);
        if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
            IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle = (IncrementalRemoteKeyedStateHandle)keyedStateHandle;
            this.restorePreviousIncrementalFilesStatus((IncrementalKeyedStateHandle)incrementalRemoteKeyedStateHandle);
            this.restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
        } else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
            IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle = (IncrementalLocalKeyedStateHandle)keyedStateHandle;
            this.restorePreviousIncrementalFilesStatus((IncrementalKeyedStateHandle)incrementalLocalKeyedStateHandle);
            this.restoreFromLocalState(incrementalLocalKeyedStateHandle);
        } else {
            throw StateUtil.unexpectedStateHandleException((Class[])new Class[]{IncrementalRemoteKeyedStateHandle.class, IncrementalLocalKeyedStateHandle.class}, keyedStateHandle.getClass());
        }
        logger.info("Finished restoring from state handle: {} without rescaling.", (Object)keyedStateHandle);
    }

    private void restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle localKeyedStateHandle) {
        this.backendUID = localKeyedStateHandle.getBackendIdentifier();
        this.restoredSstFiles.put(localKeyedStateHandle.getCheckpointId(), localKeyedStateHandle.getSharedStateHandleIDs());
        this.lastCompletedCheckpointId = localKeyedStateHandle.getCheckpointId();
    }

    private void restoreFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle) throws Exception {
        Path tmpRestoreInstancePath = this.instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
        try {
            this.restoreFromLocalState(this.transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle));
        }
        finally {
            this.cleanUpPathQuietly(tmpRestoreInstancePath);
        }
    }

    private void restoreFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception {
        KeyedBackendSerializationProxy<K> serializationProxy = this.readMetaData(localKeyedStateHandle.getMetaDataState());
        List stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots();
        Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
        logger.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", (Object)this.operatorIdentifier, (Object)this.backendUID);
        this.rocksHandle.openDB(this.createlumnFamilyDescriptors(stateMetaInfoSnapshots, true), stateMetaInfoSnapshots, restoreSourcePath);
    }

    private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory(Path temporaryRestoreInstancePath, IncrementalRemoteKeyedStateHandle restoreStateHandle) throws Exception {
        try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(this.numberOfTransferringThreads);){
            rocksDBStateDownloader.transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath, this.cancelStreamRegistry);
        }
        return new IncrementalLocalKeyedStateHandle(restoreStateHandle.getBackendIdentifier(), restoreStateHandle.getCheckpointId(), new DirectoryStateHandle(temporaryRestoreInstancePath), restoreStateHandle.getKeyGroupRange(), restoreStateHandle.getMetaStateHandle(), restoreStateHandle.getSharedState().keySet());
    }

    private void cleanUpPathQuietly(@Nonnull Path path) {
        try {
            FileUtils.deleteDirectory((File)path.toFile());
        }
        catch (IOException ex) {
            logger.warn("Failed to clean up path " + path, (Throwable)ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
        KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(restoreStateHandles, this.keyGroupRange);
        if (initialHandle != null) {
            restoreStateHandles.remove(initialHandle);
            this.initDBWithRescaling(initialHandle);
        } else {
            this.rocksHandle.openDB();
        }
        byte[] startKeyGroupPrefixBytes = new byte[this.keyGroupPrefixBytes];
        CompositeKeySerializationUtils.serializeKeyGroup((int)this.keyGroupRange.getStartKeyGroup(), (byte[])startKeyGroupPrefixBytes);
        byte[] stopKeyGroupPrefixBytes = new byte[this.keyGroupPrefixBytes];
        CompositeKeySerializationUtils.serializeKeyGroup((int)(this.keyGroupRange.getEndKeyGroup() + 1), (byte[])stopKeyGroupPrefixBytes);
        for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
            if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
                throw StateUtil.unexpectedStateHandleException(IncrementalRemoteKeyedStateHandle.class, rawStateHandle.getClass());
            }
            logger.info("Starting to restore from state handle: {} with rescaling.", (Object)rawStateHandle);
            Path temporaryRestoreInstancePath = this.instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
            try {
                RestoredDBInstance tmpRestoreDBInfo = this.restoreDBInstanceFromStateHandle((IncrementalRemoteKeyedStateHandle)rawStateHandle, temporaryRestoreInstancePath);
                Throwable throwable = null;
                try {
                    RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), this.writeBatchSize);
                    Throwable throwable2 = null;
                    try {
                        List tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
                        List tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
                        for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
                            ColumnFamilyHandle tmpColumnFamilyHandle = (ColumnFamilyHandle)tmpColumnFamilyHandles.get(i);
                            ColumnFamilyHandle targetColumnFamilyHandle = this.rocksHandle.getOrRegisterStateColumnFamilyHandle(null, (StateMetaInfoSnapshot)((StateMetaInfoSnapshot)((RestoredDBInstance)tmpRestoreDBInfo).stateMetaInfoSnapshots.get((int)i))).columnFamilyHandle;
                            try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions);){
                                iterator.seek(startKeyGroupPrefixBytes);
                                while (iterator.isValid() && RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
                                    writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
                                    iterator.next();
                                }
                                continue;
                            }
                        }
                        logger.info("Finished restoring from state handle: {} with rescaling.", (Object)rawStateHandle);
                    }
                    catch (Throwable throwable3) {
                        throwable2 = throwable3;
                        throw throwable3;
                    }
                    finally {
                        if (writeBatchWrapper == null) continue;
                        if (throwable2 != null) {
                            try {
                                writeBatchWrapper.close();
                            }
                            catch (Throwable throwable4) {
                                throwable2.addSuppressed(throwable4);
                            }
                            continue;
                        }
                        writeBatchWrapper.close();
                    }
                }
                catch (Throwable throwable5) {
                    throwable = throwable5;
                    throw throwable5;
                }
                finally {
                    if (tmpRestoreDBInfo == null) continue;
                    if (throwable != null) {
                        try {
                            tmpRestoreDBInfo.close();
                        }
                        catch (Throwable throwable6) {
                            throwable.addSuppressed(throwable6);
                        }
                        continue;
                    }
                    tmpRestoreDBInfo.close();
                }
            }
            finally {
                this.cleanUpPathQuietly(temporaryRestoreInstancePath);
            }
        }
    }

    private void initDBWithRescaling(KeyedStateHandle initialHandle) throws Exception {
        assert (initialHandle instanceof IncrementalRemoteKeyedStateHandle);
        this.restoreFromRemoteState((IncrementalRemoteKeyedStateHandle)initialHandle);
        try {
            RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(this.rocksHandle.getDb(), this.rocksHandle.getColumnFamilyHandles(), this.keyGroupRange, initialHandle.getKeyGroupRange(), this.keyGroupPrefixBytes, this.writeBatchSize);
        }
        catch (RocksDBException e) {
            String errMsg = "Failed to clip DB after initialization.";
            logger.error(errMsg, (Throwable)e);
            throw new BackendBuildingException(errMsg, (Throwable)e);
        }
    }

    private RestoredDBInstance restoreDBInstanceFromStateHandle(IncrementalRemoteKeyedStateHandle restoreStateHandle, Path temporaryRestoreInstancePath) throws Exception {
        try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader(this.numberOfTransferringThreads);){
            rocksDBStateDownloader.transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath, this.cancelStreamRegistry);
        }
        KeyedBackendSerializationProxy<K> serializationProxy = this.readMetaData(restoreStateHandle.getMetaStateHandle());
        List stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots();
        List<ColumnFamilyDescriptor> columnFamilyDescriptors = this.createlumnFamilyDescriptors(stateMetaInfoSnapshots, false);
        ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<ColumnFamilyHandle>(stateMetaInfoSnapshots.size() + 1);
        RocksDB restoreDb = RocksDBOperationUtils.openDB(temporaryRestoreInstancePath.toString(), columnFamilyDescriptors, columnFamilyHandles, RocksDBOperationUtils.createColumnFamilyOptions(this.rocksHandle.getColumnFamilyOptionsFactory(), "default"), this.rocksHandle.getDbOptions());
        return new RestoredDBInstance(restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots);
    }

    private List<ColumnFamilyDescriptor> createlumnFamilyDescriptors(List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean registerTtlCompactFilter) {
        ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(stateMetaInfoSnapshots.size());
        for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
            RegisteredStateMetaInfoBase metaInfoBase = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot((StateMetaInfoSnapshot)stateMetaInfoSnapshot);
            ColumnFamilyDescriptor columnFamilyDescriptor = RocksDBOperationUtils.createColumnFamilyDescriptor(metaInfoBase, this.rocksHandle.getColumnFamilyOptionsFactory(), registerTtlCompactFilter ? this.rocksHandle.getTtlCompactFiltersManager() : null, this.rocksHandle.getWriteBufferManagerCapacity());
            columnFamilyDescriptors.add(columnFamilyDescriptor);
        }
        return columnFamilyDescriptors;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private KeyedBackendSerializationProxy<K> readMetaData(StreamStateHandle metaStateHandle) throws Exception {
        FSDataInputStream inputStream = null;
        try {
            inputStream = metaStateHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable((Closeable)inputStream);
            DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)inputStream);
            KeyedBackendSerializationProxy<K> keyedBackendSerializationProxy = this.readMetaData((DataInputView)in);
            return keyedBackendSerializationProxy;
        }
        finally {
            if (this.cancelStreamRegistry.unregisterCloseable((Closeable)inputStream)) {
                inputStream.close();
            }
        }
    }

    KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView) throws IOException, StateMigrationException {
        KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
        serializationProxy.read(dataInputView);
        if (!this.isKeySerializerCompatibilityChecked) {
            TypeSerializer currentSerializer = this.keySerializerProvider.currentSchemaSerializer();
            TypeSerializerSchemaCompatibility keySerializerSchemaCompat = this.keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(serializationProxy.getKeySerializerSnapshot());
            if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
                throw new StateMigrationException("The new key serializer (" + currentSerializer + ") must be compatible with the previous key serializer (" + this.keySerializerProvider.previousSchemaSerializer() + ").");
            }
            this.isKeySerializerCompatibilityChecked = true;
        }
        return serializationProxy;
    }

    @Override
    public void close() throws Exception {
        this.rocksHandle.close();
    }

    private static class RestoredDBInstance
    implements AutoCloseable {
        @Nonnull
        private final RocksDB db;
        @Nonnull
        private final ColumnFamilyHandle defaultColumnFamilyHandle;
        @Nonnull
        private final List<ColumnFamilyHandle> columnFamilyHandles;
        @Nonnull
        private final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
        @Nonnull
        private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
        private final ReadOptions readOptions;

        private RestoredDBInstance(@Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
            this.db = db;
            this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0);
            this.columnFamilyHandles = columnFamilyHandles;
            this.columnFamilyDescriptors = columnFamilyDescriptors;
            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
            this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
        }

        @Override
        public void close() {
            ArrayList<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<ColumnFamilyOptions>(this.columnFamilyDescriptors.size() + 1);
            this.columnFamilyDescriptors.forEach(cfd -> columnFamilyOptions.add(cfd.getOptions()));
            RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, this.defaultColumnFamilyHandle);
            IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamilyHandle);
            IOUtils.closeAllQuietly(this.columnFamilyHandles);
            IOUtils.closeQuietly((AutoCloseable)this.db);
            IOUtils.closeAllQuietly(columnFamilyOptions);
            IOUtils.closeQuietly((AutoCloseable)this.readOptions);
        }
    }
}

