/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.contrib.streaming.state.RocksDBAggregatingState;
import org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet;
import org.apache.flink.contrib.streaming.state.RocksDBFoldingState;
import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils;
import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBListState;
import org.apache.flink.contrib.streaming.state.RocksDBMapState;
import org.apache.flink.contrib.streaming.state.RocksDBReducingState;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBValueState;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.contrib.streaming.state.TreeOrderedSetCache;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.io.async.StoppableCallbackCallable;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.function.SupplierWithException;
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBKeyedStateBackend<K>
extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
    public static final String MERGE_OPERATOR_NAME = "stringappendtest";
    private static final String SST_FILE_SUFFIX = ".sst";
    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES = Stream.of(Tuple2.of(ValueStateDescriptor.class, RocksDBValueState::create), Tuple2.of(ListStateDescriptor.class, RocksDBListState::create), Tuple2.of(MapStateDescriptor.class, RocksDBMapState::create), Tuple2.of(AggregatingStateDescriptor.class, RocksDBAggregatingState::create), Tuple2.of(ReducingStateDescriptor.class, RocksDBReducingState::create), Tuple2.of(FoldingStateDescriptor.class, RocksDBFoldingState::create)).collect(Collectors.toMap(t -> (Class)t.f0, t -> (StateFactory)t.f1));
    private final String operatorIdentifier;
    private final ColumnFamilyOptions columnOptions;
    private final DBOptions dbOptions;
    private final File instanceBasePath;
    private final File instanceRocksDBPath;
    private final ResourceGuard rocksDBResourceGuard;
    protected RocksDB db;
    private ColumnFamilyHandle defaultColumnFamily;
    private final WriteOptions writeOptions;
    private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
    private final Map<String, StateMetaInfoSnapshot> restoredKvStateMetaInfos;
    private final int keyGroupPrefixBytes;
    private final boolean enableIncrementalCheckpointing;
    private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
    private long lastCompletedCheckpointId = -1L;
    private UUID backendUID;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
    private final PriorityQueueSetFactory priorityQueueFactory;
    private RocksDBWriteBatchWrapper writeBatchWrapper;

    public RocksDBKeyedStateBackend(String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, DBOptions dbOptions, ColumnFamilyOptions columnFamilyOptions, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, boolean enableIncrementalCheckpointing, LocalRecoveryConfig localRecoveryConfig, RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType, TtlTimeProvider ttlTimeProvider) throws IOException {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
        this.operatorIdentifier = (String)Preconditions.checkNotNull((Object)operatorIdentifier);
        this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
        this.rocksDBResourceGuard = new ResourceGuard();
        this.columnOptions = ((ColumnFamilyOptions)Preconditions.checkNotNull((Object)columnFamilyOptions)).setMergeOperatorName(MERGE_OPERATOR_NAME);
        this.dbOptions = (DBOptions)Preconditions.checkNotNull((Object)dbOptions);
        this.instanceBasePath = (File)Preconditions.checkNotNull((Object)instanceBasePath);
        this.instanceRocksDBPath = new File(instanceBasePath, "db");
        RocksDBKeyedStateBackend.checkAndCreateDirectory(instanceBasePath);
        if (this.instanceRocksDBPath.exists()) {
            this.cleanInstanceBasePath();
        }
        this.localRecoveryConfig = (LocalRecoveryConfig)Preconditions.checkNotNull((Object)localRecoveryConfig);
        this.keyGroupPrefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(this.getNumberOfKeyGroups());
        this.kvStateInformation = new LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>>();
        this.restoredKvStateMetaInfos = new HashMap<String, StateMetaInfoSnapshot>();
        this.materializedSstFiles = new TreeMap<Long, Set<StateHandleID>>();
        this.backendUID = UUID.randomUUID();
        this.snapshotStrategy = enableIncrementalCheckpointing ? new IncrementalSnapshotStrategy() : new FullSnapshotStrategy();
        this.writeOptions = new WriteOptions().setDisableWAL(true);
        switch (priorityQueueStateType) {
            case HEAP: {
                this.priorityQueueFactory = new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
                break;
            }
            case ROCKSDB: {
                this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory();
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown priority queue state type: " + (Object)((Object)priorityQueueStateType));
            }
        }
        LOG.debug("Setting initial keyed backend uid for operator {} to {}.", (Object)this.operatorIdentifier, (Object)this.backendUID);
    }

    private static void checkAndCreateDirectory(File directory) throws IOException {
        if (directory.exists()) {
            if (!directory.isDirectory()) {
                throw new IOException("Not a directory: " + directory);
            }
        } else if (!directory.mkdirs()) {
            throw new IOException(String.format("Could not create RocksDB data directory at %s.", directory));
        }
    }

    public <N> Stream<K> getKeys(String state, N namespace) {
        byte[] nameSpaceBytes;
        Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnInfo = this.kvStateInformation.get(state);
        if (columnInfo == null || !(columnInfo.f1 instanceof RegisteredKeyValueStateBackendMetaInfo)) {
            return Stream.empty();
        }
        RegisteredKeyValueStateBackendMetaInfo registeredKeyValueStateBackendMetaInfo = (RegisteredKeyValueStateBackendMetaInfo)columnInfo.f1;
        TypeSerializer namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
        ByteArrayOutputStreamWithPos namespaceOutputStream = new ByteArrayOutputStreamWithPos(8);
        boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(this.keySerializer, namespaceSerializer);
        try {
            RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, namespaceOutputStream, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)namespaceOutputStream), ambiguousKeyPossible);
            nameSpaceBytes = namespaceOutputStream.toByteArray();
        }
        catch (IOException ex) {
            throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", (Throwable)ex);
        }
        RocksIteratorWrapper iterator2 = RocksDBKeyedStateBackend.getRocksIterator(this.db, (ColumnFamilyHandle)columnInfo.f0);
        iterator2.seekToFirst();
        RocksIteratorForKeysWrapper iteratorWrapper = new RocksIteratorForKeysWrapper(iterator2, state, this.keySerializer, this.keyGroupPrefixBytes, ambiguousKeyPossible, nameSpaceBytes);
        Stream targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, 16), false);
        return (Stream)targetStream.onClose(iteratorWrapper::close);
    }

    @VisibleForTesting
    ColumnFamilyHandle getColumnFamilyHandle(String state) {
        Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnInfo = this.kvStateInformation.get(state);
        return columnInfo != null ? (ColumnFamilyHandle)columnInfo.f0 : null;
    }

    public void dispose() {
        super.dispose();
        this.rocksDBResourceGuard.close();
        if (this.db != null) {
            IOUtils.closeQuietly((AutoCloseable)this.writeBatchWrapper);
            IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamily);
            for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnMetaData : this.kvStateInformation.values()) {
                IOUtils.closeQuietly((AutoCloseable)((AutoCloseable)columnMetaData.f0));
            }
            IOUtils.closeQuietly((AutoCloseable)this.db);
            this.db = null;
            IOUtils.closeQuietly((AutoCloseable)this.columnOptions);
            IOUtils.closeQuietly((AutoCloseable)this.dbOptions);
            IOUtils.closeQuietly((AutoCloseable)this.writeOptions);
            this.kvStateInformation.clear();
            this.restoredKvStateMetaInfos.clear();
            this.cleanInstanceBasePath();
        }
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return this.priorityQueueFactory.create(stateName, byteOrderedElementSerializer);
    }

    private void cleanInstanceBasePath() {
        LOG.info("Deleting existing instance base directory {}.", (Object)this.instanceBasePath);
        try {
            FileUtils.deleteDirectory((File)this.instanceBasePath);
        }
        catch (IOException ex) {
            LOG.warn("Could not delete instance base path for RocksDB: " + this.instanceBasePath, ex);
        }
    }

    public int getKeyGroupPrefixBytes() {
        return this.keyGroupPrefixBytes;
    }

    @VisibleForTesting
    PriorityQueueSetFactory getPriorityQueueFactory() {
        return this.priorityQueueFactory;
    }

    public WriteOptions getWriteOptions() {
        return this.writeOptions;
    }

    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception {
        this.writeBatchWrapper.flush();
        return this.snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    }

    public void restore(Collection<KeyedStateHandle> restoreState) throws Exception {
        LOG.info("Initializing RocksDB keyed state backend.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", (Object)restoreState);
        }
        this.kvStateInformation.clear();
        this.restoredKvStateMetaInfos.clear();
        try {
            if (restoreState == null || restoreState.isEmpty()) {
                this.createDB();
            } else {
                KeyedStateHandle firstStateHandle = restoreState.iterator().next();
                if (firstStateHandle instanceof IncrementalKeyedStateHandle || firstStateHandle instanceof IncrementalLocalKeyedStateHandle) {
                    RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation(this);
                    restoreOperation.restore(restoreState);
                } else {
                    RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation(this);
                    restoreOperation.doRestore(restoreState);
                }
            }
        }
        catch (Exception ex) {
            this.dispose();
            throw ex;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long completedCheckpointId) {
        if (!this.enableIncrementalCheckpointing) {
            return;
        }
        SortedMap<Long, Set<StateHandleID>> sortedMap = this.materializedSstFiles;
        synchronized (sortedMap) {
            if (completedCheckpointId < this.lastCompletedCheckpointId) {
                return;
            }
            this.materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
            this.lastCompletedCheckpointId = completedCheckpointId;
        }
    }

    private void createDB() throws IOException {
        ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<ColumnFamilyHandle>(1);
        this.db = this.openDB(this.instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
        this.writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db, this.writeOptions);
        this.defaultColumnFamily = (ColumnFamilyHandle)columnFamilyHandles.get(0);
    }

    private RocksDB openDB(String path, List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException {
        RocksDB dbRef;
        ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(1 + stateColumnFamilyDescriptors.size());
        columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, this.columnOptions));
        columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
        try {
            dbRef = RocksDB.open((DBOptions)Preconditions.checkNotNull((Object)this.dbOptions), (String)Preconditions.checkNotNull((Object)path), columnFamilyDescriptors, stateColumnFamilyHandles);
        }
        catch (RocksDBException e) {
            throw new IOException("Error while opening RocksDB instance.", e);
        }
        Preconditions.checkState((1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size() ? 1 : 0) != 0, (Object)"Not all requested column family handles have been created");
        return dbRef;
    }

    private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(StateDescriptor<?, S> stateDesc, TypeSerializer<N> namespaceSerializer, @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {
        RegisteredKeyValueStateBackendMetaInfo newMetaInfo;
        Tuple2 stateInfo = this.kvStateInformation.get(stateDesc.getName());
        if (stateInfo != null) {
            StateMetaInfoSnapshot restoredMetaInfoSnapshot = this.restoredKvStateMetaInfos.get(stateDesc.getName());
            Preconditions.checkState((restoredMetaInfoSnapshot != null ? 1 : 0) != 0, (Object)"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo, but its corresponding restored snapshot cannot be found.");
            newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility((StateMetaInfoSnapshot)restoredMetaInfoSnapshot, namespaceSerializer, stateDesc, snapshotTransformer);
            stateInfo.f1 = newMetaInfo;
        } else {
            String stateName = stateDesc.getName();
            newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo(stateDesc.getType(), stateName, namespaceSerializer, stateDesc.getSerializer(), snapshotTransformer);
            ColumnFamilyHandle columnFamily = this.createColumnFamily(stateName);
            stateInfo = Tuple2.of((Object)columnFamily, (Object)newMetaInfo);
            this.kvStateInformation.put(stateDesc.getName(), (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>)stateInfo);
        }
        return Tuple2.of((Object)stateInfo.f0, (Object)newMetaInfo);
    }

    private ColumnFamilyHandle createColumnFamily(String stateName) {
        byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
        Preconditions.checkState((!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes) ? 1 : 0) != 0, (Object)"The chosen state name 'default' collides with the name of the default column family!");
        ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, this.columnOptions);
        try {
            return this.db.createColumnFamily(columnDescriptor);
        }
        catch (RocksDBException e) {
            throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", (Throwable)e);
        }
    }

    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message2 = String.format("State %s is not supported by %s", stateDesc.getClass(), ((Object)((Object)this)).getClass());
            throw new FlinkRuntimeException(message2);
        }
        Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult = this.tryRegisterKvStateInformation(stateDesc, namespaceSerializer, this.getStateSnapshotTransformer(stateDesc, snapshotTransformFactory));
        return stateFactory.createState(stateDesc, registerResult, this);
    }

    private <SV, SEV> StateSnapshotTransformer<SV> getStateSnapshotTransformer(StateDescriptor<?, SV> stateDesc, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
        if (stateDesc instanceof ListStateDescriptor) {
            Optional original = snapshotTransformFactory.createForDeserializedState();
            return original.map(est -> this.createRocksDBListStateTransformer(stateDesc, (StateSnapshotTransformer)est)).orElse(null);
        }
        if (stateDesc instanceof MapStateDescriptor) {
            Optional original = snapshotTransformFactory.createForSerializedState();
            return original.map(RocksDBMapState.StateSnapshotTransformerWrapper::new).orElse(null);
        }
        Optional original = snapshotTransformFactory.createForSerializedState();
        return original.orElse(null);
    }

    private <SV, SEV> StateSnapshotTransformer<SV> createRocksDBListStateTransformer(StateDescriptor<?, SV> stateDesc, StateSnapshotTransformer<SEV> elementTransformer) {
        return new RocksDBListState.StateSnapshotTransformerWrapper<SEV>(elementTransformer, ((ListStateDescriptor)stateDesc).getElementSerializer());
    }

    public File getInstanceBasePath() {
        return this.instanceBasePath;
    }

    public boolean supportsAsynchronousSnapshots() {
        return true;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries() {
        int count2 = 0;
        for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> column : this.kvStateInformation.values()) {
            RocksIteratorWrapper rocksIterator = RocksDBKeyedStateBackend.getRocksIterator(this.db, (ColumnFamilyHandle)column.f0);
            Throwable throwable = null;
            try {
                rocksIterator.seekToFirst();
                while (rocksIterator.isValid()) {
                    ++count2;
                    rocksIterator.next();
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (rocksIterator == null) continue;
                if (throwable != null) {
                    try {
                        rocksIterator.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                rocksIterator.close();
            }
        }
        return count2;
    }

    public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
        return new RocksIteratorWrapper(db.newIterator());
    }

    public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) {
        return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
    }

    private static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, RegisteredStateMetaInfoBase metaInfo, ReadOptions readOptions) {
        StateSnapshotTransformer stateSnapshotTransformer = null;
        if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
            stateSnapshotTransformer = ((RegisteredKeyValueStateBackendMetaInfo)metaInfo).getSnapshotTransformer();
        }
        RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
        return stateSnapshotTransformer == null ? new RocksIteratorWrapper(rocksIterator) : new TransformingRocksIteratorWrapper(rocksIterator, (StateSnapshotTransformer<byte[]>)stateSnapshotTransformer);
    }

    @Nonnull
    private <T> Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tryRegisterPriorityQueueMetaInfo(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        Tuple2 metaInfoTuple = this.kvStateInformation.get(stateName);
        if (metaInfoTuple == null) {
            ColumnFamilyHandle columnFamilyHandle = this.createColumnFamily(stateName);
            RegisteredPriorityQueueStateBackendMetaInfo metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer);
            metaInfoTuple = new Tuple2((Object)columnFamilyHandle, (Object)metaInfo);
            this.kvStateInformation.put(stateName, (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>)metaInfoTuple);
        } else {
            StateMetaInfoSnapshot restoredMetaInfoSnapshot = this.restoredKvStateMetaInfos.get(stateName);
            Preconditions.checkState((restoredMetaInfoSnapshot != null ? 1 : 0) != 0, (Object)"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo, but its corresponding restored snapshot cannot be found.");
            StateMetaInfoSnapshot.CommonSerializerKeys serializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
            TypeSerializer metaInfoTypeSerializer = restoredMetaInfoSnapshot.getTypeSerializer(serializerKey);
            if (metaInfoTypeSerializer != byteOrderedElementSerializer) {
                CompatibilityResult compatibilityResult = CompatibilityUtil.resolveCompatibilityResult((TypeSerializer)metaInfoTypeSerializer, null, (TypeSerializerConfigSnapshot)restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey), byteOrderedElementSerializer);
                if (compatibilityResult.isRequiresMigration()) {
                    throw new FlinkRuntimeException((Throwable)StateMigrationException.notSupported());
                }
                metaInfoTuple.f1 = new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer);
            }
        }
        return metaInfoTuple;
    }

    public boolean requiresLegacySynchronousTimerSnapshots() {
        return this.priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
    }

    class RocksDBPriorityQueueSetFactory
    implements PriorityQueueSetFactory {
        private static final int DEFAULT_CACHES_SIZE = 128;
        @Nonnull
        private final ByteArrayDataOutputView sharedElementOutView = new ByteArrayDataOutputView();
        @Nonnull
        private final ByteArrayDataInputView sharedElementInView = new ByteArrayDataInputView();

        RocksDBPriorityQueueSetFactory() {
        }

        @Nonnull
        public <T extends HeapPriorityQueueElement & PriorityComparable> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, final @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
            Tuple2 metaInfoTuple = RocksDBKeyedStateBackend.this.tryRegisterPriorityQueueMetaInfo(stateName, byteOrderedElementSerializer);
            final ColumnFamilyHandle columnFamilyHandle = (ColumnFamilyHandle)metaInfoTuple.f0;
            return new KeyGroupPartitionedPriorityQueue(KeyExtractorFunction.forKeyedObjects(), PriorityComparator.forPriorityComparableObjects(), new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, RocksDBCachingPriorityQueueSet<T>>(){

                @Nonnull
                public RocksDBCachingPriorityQueueSet<T> create(int keyGroupId, int numKeyGroups, @Nonnull KeyExtractorFunction<T> keyExtractor, @Nonnull PriorityComparator<T> elementPriorityComparator) {
                    TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(128);
                    return new RocksDBCachingPriorityQueueSet(keyGroupId, RocksDBKeyedStateBackend.this.keyGroupPrefixBytes, RocksDBKeyedStateBackend.this.db, columnFamilyHandle, byteOrderedElementSerializer, RocksDBPriorityQueueSetFactory.this.sharedElementOutView, RocksDBPriorityQueueSetFactory.this.sharedElementInView, RocksDBKeyedStateBackend.this.writeBatchWrapper, orderedSetCache);
                }
            }, RocksDBKeyedStateBackend.this.keyGroupRange, RocksDBKeyedStateBackend.this.numberOfKeyGroups);
        }
    }

    private static final class RocksDBIncrementalSnapshotOperation<K> {
        private final RocksDBKeyedStateBackend<K> stateBackend;
        private final CheckpointStreamFactory checkpointStreamFactory;
        private final long checkpointId;
        private Set<StateHandleID> baseSstFiles;
        private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<StateMetaInfoSnapshot>();
        private SnapshotDirectory localBackupDirectory;
        private final CloseableRegistry closeableRegistry = new CloseableRegistry();
        private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<StateHandleID, StreamStateHandle>();
        private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<StateHandleID, StreamStateHandle>();
        private final ResourceGuard.Lease dbLease;
        private SnapshotResult<StreamStateHandle> metaStateHandle = null;

        private RocksDBIncrementalSnapshotOperation(RocksDBKeyedStateBackend<K> stateBackend, CheckpointStreamFactory checkpointStreamFactory, SnapshotDirectory localBackupDirectory, long checkpointId) throws IOException {
            this.stateBackend = stateBackend;
            this.checkpointStreamFactory = checkpointStreamFactory;
            this.checkpointId = checkpointId;
            this.dbLease = ((RocksDBKeyedStateBackend)this.stateBackend).rocksDBResourceGuard.acquireResource();
            this.localBackupDirectory = localBackupDirectory;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private StreamStateHandle materializeStateData(Path filePath) throws Exception {
            FSDataInputStream inputStream = null;
            CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
            try {
                int numBytes;
                byte[] buffer = new byte[8192];
                FileSystem backupFileSystem = this.localBackupDirectory.getFileSystem();
                inputStream = backupFileSystem.open(filePath);
                this.closeableRegistry.registerCloseable((Closeable)inputStream);
                outputStream = this.checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
                this.closeableRegistry.registerCloseable((Closeable)outputStream);
                while ((numBytes = inputStream.read(buffer)) != -1) {
                    outputStream.write(buffer, 0, numBytes);
                }
                StreamStateHandle result2 = null;
                if (this.closeableRegistry.unregisterCloseable((Closeable)outputStream)) {
                    result2 = outputStream.closeAndGetHandle();
                    outputStream = null;
                }
                StreamStateHandle streamStateHandle = result2;
                return streamStateHandle;
            }
            finally {
                if (this.closeableRegistry.unregisterCloseable((Closeable)inputStream)) {
                    inputStream.close();
                }
                if (this.closeableRegistry.unregisterCloseable((Closeable)outputStream)) {
                    outputStream.close();
                }
            }
        }

        @Nonnull
        private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
            LocalRecoveryConfig localRecoveryConfig = ((RocksDBKeyedStateBackend)this.stateBackend).localRecoveryConfig;
            CheckpointStreamWithResultProvider streamWithResultProvider = localRecoveryConfig.isLocalRecoveryEnabled() ? CheckpointStreamWithResultProvider.createDuplicatingStream((long)this.checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)this.checkpointStreamFactory, (LocalRecoveryDirectoryProvider)localRecoveryConfig.getLocalStateDirectoryProvider()) : CheckpointStreamWithResultProvider.createSimpleStream((CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)this.checkpointStreamFactory);
            try {
                this.closeableRegistry.registerCloseable((Closeable)streamWithResultProvider);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(((RocksDBKeyedStateBackend)this.stateBackend).keySerializer, this.stateMetaInfoSnapshots, false);
                DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)streamWithResultProvider.getCheckpointOutputStream());
                serializationProxy.write((DataOutputView)out);
                if (this.closeableRegistry.unregisterCloseable((Closeable)streamWithResultProvider)) {
                    SnapshotResult result2 = streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
                    streamWithResultProvider = null;
                    SnapshotResult snapshotResult = result2;
                    return snapshotResult;
                }
                throw new IOException("Stream already closed and cannot return a handle.");
            }
            finally {
                if (streamWithResultProvider != null && this.closeableRegistry.unregisterCloseable((Closeable)streamWithResultProvider)) {
                    IOUtils.closeQuietly((AutoCloseable)streamWithResultProvider);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void takeSnapshot() throws Exception {
            long lastCompletedCheckpoint;
            SortedMap sortedMap = ((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles;
            synchronized (sortedMap) {
                lastCompletedCheckpoint = ((RocksDBKeyedStateBackend)this.stateBackend).lastCompletedCheckpointId;
                this.baseSstFiles = (Set)((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles.get(lastCompletedCheckpoint);
            }
            LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} assuming the following (shared) files as base: {}.", this.checkpointId, lastCompletedCheckpoint, this.baseSstFiles);
            for (Map.Entry entry : ((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.entrySet()) {
                this.stateMetaInfoSnapshots.add(((RegisteredStateMetaInfoBase)((Tuple2)entry.getValue()).f1).snapshot());
            }
            LOG.trace("Local RocksDB checkpoint goes to backup path {}.", (Object)this.localBackupDirectory);
            if (this.localBackupDirectory.exists()) {
                throw new IllegalStateException("Unexpected existence of the backup directory.");
            }
            Throwable throwable = null;
            try (Checkpoint checkpoint = Checkpoint.create(this.stateBackend.db);){
                checkpoint.createCheckpoint(this.localBackupDirectory.getDirectory().getPath());
            }
            catch (Throwable throwable2) {
                Throwable throwable3 = throwable2;
                throw throwable2;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nonnull
        SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception {
            ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable((Closeable)this.closeableRegistry);
            this.metaStateHandle = this.materializeMetaData();
            Preconditions.checkNotNull(this.metaStateHandle, (String)"Metadata was not properly created.");
            Preconditions.checkNotNull((Object)this.metaStateHandle.getJobManagerOwnedSnapshot(), (String)"Metadata for job manager was not properly created.");
            Preconditions.checkState((boolean)this.localBackupDirectory.exists());
            FileStatus[] fileStatuses = this.localBackupDirectory.listStatus();
            if (fileStatuses != null) {
                for (FileStatus fileStatus : fileStatuses) {
                    Path filePath = fileStatus.getPath();
                    String fileName = filePath.getName();
                    StateHandleID stateHandleID = new StateHandleID(fileName);
                    if (fileName.endsWith(RocksDBKeyedStateBackend.SST_FILE_SUFFIX)) {
                        boolean existsAlready;
                        boolean bl = existsAlready = this.baseSstFiles != null && this.baseSstFiles.contains(stateHandleID);
                        if (existsAlready) {
                            this.sstFiles.put(stateHandleID, (StreamStateHandle)new PlaceholderStreamStateHandle());
                            continue;
                        }
                        this.sstFiles.put(stateHandleID, this.materializeStateData(filePath));
                        continue;
                    }
                    StreamStateHandle fileHandle = this.materializeStateData(filePath);
                    this.miscFiles.put(stateHandleID, fileHandle);
                }
            }
            SortedMap sortedMap = ((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles;
            synchronized (sortedMap) {
                ((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles.put(this.checkpointId, this.sstFiles.keySet());
            }
            IncrementalKeyedStateHandle incrementalKeyedStateHandle = new IncrementalKeyedStateHandle(((RocksDBKeyedStateBackend)this.stateBackend).backendUID, ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupRange, this.checkpointId, this.sstFiles, this.miscFiles, (StreamStateHandle)this.metaStateHandle.getJobManagerOwnedSnapshot());
            StreamStateHandle taskLocalSnapshotMetaDataStateHandle = (StreamStateHandle)this.metaStateHandle.getTaskLocalSnapshot();
            DirectoryStateHandle directoryStateHandle = null;
            try {
                directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
            }
            catch (IOException ex) {
                Exception collector = ex;
                try {
                    taskLocalSnapshotMetaDataStateHandle.discardState();
                }
                catch (Exception discardEx) {
                    collector = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)discardEx, (Throwable)collector);
                }
                LOG.warn("Problem with local state snapshot.", collector);
            }
            if (directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null) {
                IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle(((RocksDBKeyedStateBackend)this.stateBackend).backendUID, this.checkpointId, directoryStateHandle, ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupRange, taskLocalSnapshotMetaDataStateHandle, this.sstFiles.keySet());
                return SnapshotResult.withLocalState((StateObject)incrementalKeyedStateHandle, (StateObject)localDirKeyedStateHandle);
            }
            return SnapshotResult.of((StateObject)incrementalKeyedStateHandle);
        }

        void stop() {
            if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable((Closeable)this.closeableRegistry)) {
                try {
                    this.closeableRegistry.close();
                }
                catch (IOException e) {
                    LOG.warn("Could not properly close io streams.", e);
                }
            }
        }

        void releaseResources(boolean canceled) {
            this.dbLease.close();
            if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable((Closeable)this.closeableRegistry)) {
                try {
                    this.closeableRegistry.close();
                }
                catch (IOException e) {
                    LOG.warn("Exception on closing registry.", e);
                }
            }
            try {
                if (this.localBackupDirectory.exists()) {
                    LOG.trace("Running cleanup for local RocksDB backup directory {}.", (Object)this.localBackupDirectory);
                    boolean cleanupOk = this.localBackupDirectory.cleanup();
                    if (!cleanupOk) {
                        LOG.debug("Could not properly cleanup local RocksDB backup directory.");
                    }
                }
            }
            catch (IOException e) {
                LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
            }
            if (canceled) {
                ArrayList<Object> statesToDiscard = new ArrayList<Object>(1 + this.miscFiles.size() + this.sstFiles.size());
                statesToDiscard.add(this.metaStateHandle);
                statesToDiscard.addAll(this.miscFiles.values());
                statesToDiscard.addAll(this.sstFiles.values());
                try {
                    StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
                }
                catch (Exception e) {
                    LOG.warn("Could not properly discard states.", e);
                }
                if (this.localBackupDirectory.isSnapshotCompleted()) {
                    try {
                        DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
                        if (directoryStateHandle != null) {
                            directoryStateHandle.discardState();
                        }
                    }
                    catch (Exception e) {
                        LOG.warn("Could not properly discard local state.", e);
                    }
                }
            }
        }
    }

    @VisibleForTesting
    static class RocksDBFullSnapshotOperation<K>
    extends AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> {
        static final int FIRST_BIT_IN_BYTE_MASK = 128;
        static final int END_OF_KEY_GROUP_MARK = 65535;
        private final RocksDBKeyedStateBackend<K> stateBackend;
        private final KeyGroupRangeOffsets keyGroupRangeOffsets;
        private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;
        private final CloseableRegistry snapshotCloseableRegistry;
        private final ResourceGuard.Lease dbLease;
        private Snapshot snapshot;
        private ReadOptions readOptions;
        private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
        private List<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> copiedMeta;
        private List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators;
        private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
        private DataOutputView outputView;

        RocksDBFullSnapshotOperation(RocksDBKeyedStateBackend<K> stateBackend, SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier, CloseableRegistry registry) throws IOException {
            this.stateBackend = stateBackend;
            this.checkpointStreamSupplier = checkpointStreamSupplier;
            this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(((RocksDBKeyedStateBackend)stateBackend).keyGroupRange);
            this.snapshotCloseableRegistry = registry;
            this.dbLease = ((RocksDBKeyedStateBackend)this.stateBackend).rocksDBResourceGuard.acquireResource();
        }

        public void takeDBSnapShot() {
            Preconditions.checkArgument((this.snapshot == null ? 1 : 0) != 0, (Object)"Only one ongoing snapshot allowed!");
            this.stateMetaInfoSnapshots = new ArrayList<StateMetaInfoSnapshot>(((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.size());
            this.copiedMeta = new ArrayList<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>>(((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.size());
            for (Tuple2 tuple23 : ((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.values()) {
                this.stateMetaInfoSnapshots.add(((RegisteredStateMetaInfoBase)tuple23.f1).snapshot());
                this.copiedMeta.add((Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>)tuple23);
            }
            this.snapshot = this.stateBackend.db.getSnapshot();
        }

        public void openCheckpointStream() throws Exception {
            Preconditions.checkArgument((this.checkpointStreamWithResultProvider == null ? 1 : 0) != 0, (Object)"Output stream for snapshot is already set.");
            this.checkpointStreamWithResultProvider = (CheckpointStreamWithResultProvider)this.checkpointStreamSupplier.get();
            this.snapshotCloseableRegistry.registerCloseable((Closeable)this.checkpointStreamWithResultProvider);
            this.outputView = new DataOutputViewStreamWrapper((OutputStream)this.checkpointStreamWithResultProvider.getCheckpointOutputStream());
        }

        public void writeDBSnapshot() throws IOException, InterruptedException, RocksDBException {
            if (null == this.snapshot) {
                throw new IOException("No snapshot available. Might be released due to cancellation.");
            }
            Preconditions.checkNotNull((Object)this.checkpointStreamWithResultProvider, (String)"No output stream to write snapshot.");
            this.writeKVStateMetaData();
            this.writeKVStateData();
        }

        @Nonnull
        public SnapshotResult<KeyedStateHandle> getSnapshotResultStateHandle() throws IOException {
            if (this.snapshotCloseableRegistry.unregisterCloseable((Closeable)this.checkpointStreamWithResultProvider)) {
                SnapshotResult res = this.checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
                this.checkpointStreamWithResultProvider = null;
                return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult((SnapshotResult)res, (KeyGroupRangeOffsets)this.keyGroupRangeOffsets);
            }
            return SnapshotResult.empty();
        }

        public void releaseSnapshotResources() {
            this.checkpointStreamWithResultProvider = null;
            if (null != this.kvStateIterators) {
                for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : this.kvStateIterators) {
                    IOUtils.closeQuietly((AutoCloseable)((AutoCloseable)kvStateIterator.f0));
                }
                this.kvStateIterators = null;
            }
            if (null != this.snapshot) {
                if (null != this.stateBackend.db) {
                    this.stateBackend.db.releaseSnapshot(this.snapshot);
                }
                IOUtils.closeQuietly((AutoCloseable)this.snapshot);
                this.snapshot = null;
            }
            if (null != this.readOptions) {
                IOUtils.closeQuietly((AutoCloseable)this.readOptions);
                this.readOptions = null;
            }
            this.dbLease.close();
        }

        private void writeKVStateMetaData() throws IOException {
            this.kvStateIterators = new ArrayList<Tuple2<RocksIteratorWrapper, Integer>>(this.copiedMeta.size());
            int kvStateId = 0;
            this.readOptions = new ReadOptions();
            this.readOptions.setSnapshot(this.snapshot);
            for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple23 : this.copiedMeta) {
                RocksIteratorWrapper rocksIteratorWrapper = RocksDBKeyedStateBackend.getRocksIterator(this.stateBackend.db, (ColumnFamilyHandle)tuple23.f0, (RegisteredStateMetaInfoBase)tuple23.f1, this.readOptions);
                this.kvStateIterators.add((Tuple2<RocksIteratorWrapper, Integer>)new Tuple2((Object)rocksIteratorWrapper, (Object)kvStateId));
                ++kvStateId;
            }
            KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.stateBackend.getKeySerializer(), this.stateMetaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupCompressionDecorator));
            serializationProxy.write(this.outputView);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void writeKVStateData() throws IOException, InterruptedException {
            byte[] previousKey = null;
            byte[] previousValue = null;
            DataOutputViewStreamWrapper kgOutView = null;
            OutputStream kgOutStream = null;
            CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = this.checkpointStreamWithResultProvider.getCheckpointOutputStream();
            try {
                try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(this.kvStateIterators, ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes);){
                    this.kvStateIterators = null;
                    if (mergeIterator.isValid()) {
                        this.keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), checkpointOutputStream.getPos());
                        kgOutStream = ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupCompressionDecorator.decorateWithCompression((OutputStream)checkpointOutputStream);
                        kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
                        kgOutView.writeShort(mergeIterator.kvStateId());
                        previousKey = mergeIterator.key();
                        previousValue = mergeIterator.value();
                        mergeIterator.next();
                    }
                    while (mergeIterator.isValid()) {
                        assert (!RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(previousKey));
                        if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
                            RocksDBFullSnapshotOperation.checkInterrupted();
                            RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(previousKey);
                        }
                        this.writeKeyValuePair(previousKey, previousValue, (DataOutputView)kgOutView);
                        if (mergeIterator.isNewKeyGroup()) {
                            kgOutView.writeShort(65535);
                            kgOutStream.close();
                            this.keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), checkpointOutputStream.getPos());
                            kgOutStream = ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupCompressionDecorator.decorateWithCompression((OutputStream)checkpointOutputStream);
                            kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
                            kgOutView.writeShort(mergeIterator.kvStateId());
                        } else if (mergeIterator.isNewKeyValueState()) {
                            kgOutView.writeShort(mergeIterator.kvStateId());
                        }
                        previousKey = mergeIterator.key();
                        previousValue = mergeIterator.value();
                        mergeIterator.next();
                    }
                }
                if (previousKey != null) {
                    assert (!RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(previousKey));
                    RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(previousKey);
                    this.writeKeyValuePair(previousKey, previousValue, (DataOutputView)kgOutView);
                    kgOutView.writeShort(65535);
                    kgOutStream.close();
                    kgOutStream = null;
                }
            }
            finally {
                IOUtils.closeQuietly(kgOutStream);
            }
        }

        private void writeKeyValuePair(byte[] key, byte[] value2, DataOutputView out) throws IOException {
            BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
            BytePrimitiveArraySerializer.INSTANCE.serialize(value2, out);
        }

        static void setMetaDataFollowsFlagInKey(byte[] key) {
            key[0] = (byte)(key[0] | 0x80);
        }

        static void clearMetaDataFollowsFlag(byte[] key) {
            key[0] = (byte)(key[0] & 0xFFFFFF7F);
        }

        static boolean hasMetaDataFollowsFlag(byte[] key) {
            return 0 != (key[0] & 0x80);
        }

        private static void checkInterrupted() throws InterruptedException {
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException("RocksDB snapshot interrupted.");
            }
        }

        protected void acquireResources() throws Exception {
            ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable((Closeable)this.snapshotCloseableRegistry);
            this.openCheckpointStream();
        }

        protected void releaseResources() {
            this.closeLocalRegistry();
            this.releaseSnapshotOperationResources();
        }

        private void releaseSnapshotOperationResources() {
            this.releaseSnapshotResources();
        }

        protected void stopOperation() {
            this.closeLocalRegistry();
        }

        private void closeLocalRegistry() {
            if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable((Closeable)this.snapshotCloseableRegistry)) {
                try {
                    this.snapshotCloseableRegistry.close();
                }
                catch (Exception ex) {
                    LOG.warn("Error closing local registry", ex);
                }
            }
        }

        @Nonnull
        public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
            long startTime = System.currentTimeMillis();
            if (this.isStopped()) {
                throw new IOException("RocksDB closed.");
            }
            this.writeDBSnapshot();
            LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", this.checkpointStreamSupplier, Thread.currentThread(), System.currentTimeMillis() - startTime);
            return this.getSnapshotResultStateHandle();
        }
    }

    private class IncrementalSnapshotStrategy
    implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
        private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate;

        public IncrementalSnapshotStrategy() {
            this.savepointDelegate = new FullSnapshotStrategy();
        }

        public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(long checkpointId, long checkpointTimestamp, CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) throws Exception {
            SnapshotDirectory snapshotDirectory;
            if (CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType()) {
                return this.savepointDelegate.performSnapshot(checkpointId, checkpointTimestamp, checkpointStreamFactory, checkpointOptions);
            }
            if (RocksDBKeyedStateBackend.this.db == null) {
                throw new IOException("RocksDB closed.");
            }
            if (RocksDBKeyedStateBackend.this.kvStateInformation.isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", (Object)checkpointTimestamp);
                }
                return DoneFuture.of((Object)SnapshotResult.empty());
            }
            if (RocksDBKeyedStateBackend.this.localRecoveryConfig.isLocalRecoveryEnabled()) {
                LocalRecoveryDirectoryProvider directoryProvider = RocksDBKeyedStateBackend.this.localRecoveryConfig.getLocalStateDirectoryProvider();
                File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
                if (directory.exists()) {
                    FileUtils.deleteDirectory((File)directory);
                }
                if (!directory.mkdirs()) {
                    throw new IOException("Local state base directory for checkpoint " + checkpointId + " already exists: " + directory);
                }
                File rdbSnapshotDir = new File(directory, "rocks_db");
                Path path = new Path(rdbSnapshotDir.toURI());
                snapshotDirectory = SnapshotDirectory.permanent((Path)path);
            } else {
                Path path = new Path(RocksDBKeyedStateBackend.this.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
                snapshotDirectory = SnapshotDirectory.temporary((Path)path);
            }
            final RocksDBIncrementalSnapshotOperation snapshotOperation = new RocksDBIncrementalSnapshotOperation(RocksDBKeyedStateBackend.this, checkpointStreamFactory, snapshotDirectory, checkpointId);
            try {
                snapshotOperation.takeSnapshot();
            }
            catch (Exception e) {
                snapshotOperation.stop();
                snapshotOperation.releaseResources(true);
                throw e;
            }
            return new FutureTask<SnapshotResult<KeyedStateHandle>>(snapshotOperation::runSnapshot){

                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    snapshotOperation.stop();
                    return super.cancel(mayInterruptIfRunning);
                }

                @Override
                protected void done() {
                    snapshotOperation.releaseResources(this.isCancelled());
                }
            };
        }
    }

    private class FullSnapshotStrategy
    implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
        private FullSnapshotStrategy() {
        }

        public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(long checkpointId, long timestamp, final CheckpointStreamFactory primaryStreamFactory, CheckpointOptions checkpointOptions) throws Exception {
            long startTime = System.currentTimeMillis();
            if (RocksDBKeyedStateBackend.this.kvStateInformation.isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", (Object)timestamp);
                }
                return DoneFuture.of((Object)SnapshotResult.empty());
            }
            SupplierWithException supplier = RocksDBKeyedStateBackend.this.localRecoveryConfig.isLocalRecoveryEnabled() && CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream((long)checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryStreamFactory, (LocalRecoveryDirectoryProvider)RocksDBKeyedStateBackend.this.localRecoveryConfig.getLocalStateDirectoryProvider()) : () -> CheckpointStreamWithResultProvider.createSimpleStream((CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryStreamFactory);
            final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
            final RocksDBFullSnapshotOperation snapshotOperation = new RocksDBFullSnapshotOperation(RocksDBKeyedStateBackend.this, (SupplierWithException<CheckpointStreamWithResultProvider, Exception>)supplier, snapshotCloseableRegistry);
            snapshotOperation.takeDBSnapShot();
            AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>(){

                protected void acquireResources() throws Exception {
                    RocksDBKeyedStateBackend.this.cancelStreamRegistry.registerCloseable((Closeable)snapshotCloseableRegistry);
                    snapshotOperation.openCheckpointStream();
                }

                protected void releaseResources() throws Exception {
                    this.closeLocalRegistry();
                    this.releaseSnapshotOperationResources();
                }

                private void releaseSnapshotOperationResources() {
                    snapshotOperation.releaseSnapshotResources();
                }

                protected void stopOperation() throws Exception {
                    this.closeLocalRegistry();
                }

                private void closeLocalRegistry() {
                    if (RocksDBKeyedStateBackend.this.cancelStreamRegistry.unregisterCloseable((Closeable)snapshotCloseableRegistry)) {
                        try {
                            snapshotCloseableRegistry.close();
                        }
                        catch (Exception ex) {
                            LOG.warn("Error closing local registry", ex);
                        }
                    }
                }

                @Nonnull
                public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
                    long startTime = System.currentTimeMillis();
                    if (this.isStopped()) {
                        throw new IOException("RocksDB closed.");
                    }
                    snapshotOperation.writeDBSnapshot();
                    LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", primaryStreamFactory, Thread.currentThread(), System.currentTimeMillis() - startTime);
                    return snapshotOperation.getSnapshotResultStateHandle();
                }
            };
            LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", primaryStreamFactory, Thread.currentThread(), System.currentTimeMillis() - startTime);
            return AsyncStoppableTaskWithCallback.from((StoppableCallbackCallable)ioCallable);
        }
    }

    static class RocksIteratorForKeysWrapper<K>
    implements Iterator<K>,
    AutoCloseable {
        private final RocksIteratorWrapper iterator;
        private final String state;
        private final TypeSerializer<K> keySerializer;
        private final int keyGroupPrefixBytes;
        private final byte[] namespaceBytes;
        private final boolean ambiguousKeyPossible;
        private K nextKey;
        private K previousKey;

        RocksIteratorForKeysWrapper(RocksIteratorWrapper iterator2, String state, TypeSerializer<K> keySerializer, int keyGroupPrefixBytes, boolean ambiguousKeyPossible, byte[] namespaceBytes) {
            this.iterator = (RocksIteratorWrapper)Preconditions.checkNotNull((Object)iterator2);
            this.state = (String)Preconditions.checkNotNull((Object)state);
            this.keySerializer = (TypeSerializer)Preconditions.checkNotNull(keySerializer);
            this.keyGroupPrefixBytes = (Integer)Preconditions.checkNotNull((Object)keyGroupPrefixBytes);
            this.namespaceBytes = (byte[])Preconditions.checkNotNull((Object)namespaceBytes);
            this.nextKey = null;
            this.previousKey = null;
            this.ambiguousKeyPossible = ambiguousKeyPossible;
        }

        @Override
        public boolean hasNext() {
            try {
                while (this.nextKey == null && this.iterator.isValid()) {
                    byte[] key = this.iterator.key();
                    ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(key, this.keyGroupPrefixBytes, key.length - this.keyGroupPrefixBytes);
                    DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper((InputStream)inputStream);
                    K value2 = RocksDBKeySerializationUtils.readKey(this.keySerializer, inputStream, (DataInputView)dataInput, this.ambiguousKeyPossible);
                    int namespaceByteStartPos = inputStream.getPosition();
                    if (this.isMatchingNameSpace(key, namespaceByteStartPos) && !Objects.equals(this.previousKey, value2)) {
                        this.previousKey = value2;
                        this.nextKey = value2;
                    }
                    this.iterator.next();
                }
            }
            catch (Exception e) {
                throw new FlinkRuntimeException("Failed to access state [" + this.state + "]", (Throwable)e);
            }
            return this.nextKey != null;
        }

        @Override
        public K next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException("Failed to access state [" + this.state + "]");
            }
            K tmpKey = this.nextKey;
            this.nextKey = null;
            return tmpKey;
        }

        private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
            int namespaceBytesLength = this.namespaceBytes.length;
            int basicLength = namespaceBytesLength + beginPos;
            if (key.length >= basicLength) {
                for (int i = 0; i < namespaceBytesLength; ++i) {
                    if (key[beginPos + i] == this.namespaceBytes[i]) continue;
                    return false;
                }
                return true;
            }
            return false;
        }

        @Override
        public void close() {
            this.iterator.close();
        }
    }

    private static final class TransformingRocksIteratorWrapper
    extends RocksIteratorWrapper {
        @Nonnull
        private final StateSnapshotTransformer<byte[]> stateSnapshotTransformer;
        private byte[] current;

        public TransformingRocksIteratorWrapper(@Nonnull RocksIterator iterator2, @Nonnull StateSnapshotTransformer<byte[]> stateSnapshotTransformer) {
            super(iterator2);
            this.stateSnapshotTransformer = stateSnapshotTransformer;
        }

        @Override
        public void seekToFirst() {
            super.seekToFirst();
            this.filterOrTransform(() -> super.next());
        }

        @Override
        public void seekToLast() {
            super.seekToLast();
            this.filterOrTransform(() -> super.prev());
        }

        @Override
        public void next() {
            super.next();
            this.filterOrTransform(() -> super.next());
        }

        @Override
        public void prev() {
            super.prev();
            this.filterOrTransform(() -> super.prev());
        }

        private void filterOrTransform(Runnable advance2) {
            while (this.isValid() && (this.current = (byte[])this.stateSnapshotTransformer.filterOrTransform((Object)super.value())) == null) {
                advance2.run();
            }
        }

        @Override
        public byte[] value() {
            if (!this.isValid()) {
                throw new IllegalStateException("value() method cannot be called if isValid() is false");
            }
            return this.current;
        }
    }

    @VisibleForTesting
    protected static final class MergeIterator
    implements AutoCloseable {
        private final RocksIteratorWrapper iterator;
        private byte[] currentKey;
        private final int kvStateId;

        MergeIterator(RocksIteratorWrapper iterator2, int kvStateId) {
            this.iterator = (RocksIteratorWrapper)Preconditions.checkNotNull((Object)iterator2);
            this.currentKey = iterator2.key();
            this.kvStateId = kvStateId;
        }

        public byte[] getCurrentKey() {
            return this.currentKey;
        }

        public void setCurrentKey(byte[] currentKey) {
            this.currentKey = currentKey;
        }

        public RocksIteratorWrapper getIterator() {
            return this.iterator;
        }

        public int getKvStateId() {
            return this.kvStateId;
        }

        @Override
        public void close() {
            IOUtils.closeQuietly((AutoCloseable)this.iterator);
        }

        static /* synthetic */ byte[] access$3702(MergeIterator x0, byte[] x1) {
            x0.currentKey = x1;
            return x1;
        }
    }

    @VisibleForTesting
    static class RocksDBMergeIterator
    implements AutoCloseable {
        private final PriorityQueue<MergeIterator> heap;
        private final int keyGroupPrefixByteCount;
        private boolean newKeyGroup;
        private boolean newKVState;
        private boolean valid;
        MergeIterator currentSubIterator;
        private static final List<Comparator<MergeIterator>> COMPARATORS;

        RocksDBMergeIterator(List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, int keyGroupPrefixByteCount) {
            Preconditions.checkNotNull(kvStateIterators);
            Preconditions.checkArgument((keyGroupPrefixByteCount >= 1 ? 1 : 0) != 0);
            this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
            Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
            if (kvStateIterators.size() > 0) {
                PriorityQueue<MergeIterator> iteratorPriorityQueue = new PriorityQueue<MergeIterator>(kvStateIterators.size(), iteratorComparator);
                for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
                    RocksIteratorWrapper rocksIterator = (RocksIteratorWrapper)rocksIteratorWithKVStateId.f0;
                    rocksIterator.seekToFirst();
                    if (rocksIterator.isValid()) {
                        iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, (Integer)rocksIteratorWithKVStateId.f1));
                        continue;
                    }
                    IOUtils.closeQuietly((AutoCloseable)rocksIterator);
                }
                kvStateIterators.clear();
                this.heap = iteratorPriorityQueue;
                this.valid = !this.heap.isEmpty();
                this.currentSubIterator = this.heap.poll();
            } else {
                this.heap = null;
                this.valid = false;
            }
            this.newKeyGroup = true;
            this.newKVState = true;
        }

        public void next() {
            this.newKeyGroup = false;
            this.newKVState = false;
            RocksIteratorWrapper rocksIterator = this.currentSubIterator.getIterator();
            rocksIterator.next();
            byte[] oldKey = this.currentSubIterator.getCurrentKey();
            if (rocksIterator.isValid()) {
                MergeIterator.access$3702(this.currentSubIterator, rocksIterator.key());
                if (this.isDifferentKeyGroup(oldKey, this.currentSubIterator.getCurrentKey())) {
                    this.heap.offer(this.currentSubIterator);
                    this.currentSubIterator = this.heap.poll();
                    this.newKVState = this.currentSubIterator.getIterator() != rocksIterator;
                    this.detectNewKeyGroup(oldKey);
                }
            } else {
                IOUtils.closeQuietly((AutoCloseable)rocksIterator);
                if (this.heap.isEmpty()) {
                    this.currentSubIterator = null;
                    this.valid = false;
                } else {
                    this.currentSubIterator = this.heap.poll();
                    this.newKVState = true;
                    this.detectNewKeyGroup(oldKey);
                }
            }
        }

        private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
            return 0 != RocksDBMergeIterator.compareKeyGroupsForByteArrays(a, b, this.keyGroupPrefixByteCount);
        }

        private void detectNewKeyGroup(byte[] oldKey) {
            if (this.isDifferentKeyGroup(oldKey, this.currentSubIterator.currentKey)) {
                this.newKeyGroup = true;
            }
        }

        public int keyGroup() {
            int result2 = 0;
            for (int i = 0; i < this.keyGroupPrefixByteCount; ++i) {
                result2 <<= 8;
                result2 |= this.currentSubIterator.currentKey[i] & 0xFF;
            }
            return result2;
        }

        public byte[] key() {
            return this.currentSubIterator.getCurrentKey();
        }

        public byte[] value() {
            return this.currentSubIterator.getIterator().value();
        }

        public int kvStateId() {
            return this.currentSubIterator.getKvStateId();
        }

        public boolean isNewKeyValueState() {
            return this.newKVState;
        }

        public boolean isNewKeyGroup() {
            return this.newKeyGroup;
        }

        public boolean isValid() {
            return this.valid;
        }

        private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
            for (int i = 0; i < len; ++i) {
                int diff2 = (a[i] & 0xFF) - (b[i] & 0xFF);
                if (diff2 == 0) continue;
                return diff2;
            }
            return 0;
        }

        @Override
        public void close() {
            IOUtils.closeQuietly((AutoCloseable)this.currentSubIterator);
            this.currentSubIterator = null;
            IOUtils.closeAllQuietly(this.heap);
            this.heap.clear();
        }

        static {
            int maxBytes = 2;
            COMPARATORS = new ArrayList<Comparator<MergeIterator>>(maxBytes);
            for (int i = 0; i < maxBytes; ++i) {
                int currentBytes = i + 1;
                COMPARATORS.add((o1, o2) -> {
                    int arrayCmpRes = RocksDBMergeIterator.compareKeyGroupsForByteArrays(((MergeIterator)o1).currentKey, ((MergeIterator)o2).currentKey, currentBytes);
                    return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
                });
            }
        }
    }

    private static class RocksDBIncrementalRestoreOperation<T> {
        private final RocksDBKeyedStateBackend<T> stateBackend;

        private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
            this.stateBackend = stateBackend;
        }

        void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
            boolean isRescaling;
            if (restoreStateHandles.isEmpty()) {
                return;
            }
            KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next();
            boolean bl = isRescaling = restoreStateHandles.size() > 1 || !Objects.equals(theFirstStateHandle.getKeyGroupRange(), ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupRange);
            if (!isRescaling) {
                this.restoreWithoutRescaling(theFirstStateHandle);
            } else {
                this.restoreWithRescaling(restoreStateHandles);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception {
            Path temporaryRestoreInstancePath = new Path(((RocksDBKeyedStateBackend)this.stateBackend).instanceBasePath.getAbsolutePath(), UUID.randomUUID().toString());
            try {
                IncrementalLocalKeyedStateHandle localKeyedStateHandle;
                List<ColumnFamilyDescriptor> columnFamilyDescriptors;
                List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
                if (rawStateHandle instanceof IncrementalKeyedStateHandle) {
                    IncrementalKeyedStateHandle restoreStateHandle = (IncrementalKeyedStateHandle)rawStateHandle;
                    this.transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath);
                    stateMetaInfoSnapshots = this.readMetaData(restoreStateHandle.getMetaStateHandle());
                    columnFamilyDescriptors = this.createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
                    localKeyedStateHandle = new IncrementalLocalKeyedStateHandle(restoreStateHandle.getBackendIdentifier(), restoreStateHandle.getCheckpointId(), new DirectoryStateHandle(temporaryRestoreInstancePath), restoreStateHandle.getKeyGroupRange(), restoreStateHandle.getMetaStateHandle(), restoreStateHandle.getSharedState().keySet());
                } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) {
                    localKeyedStateHandle = (IncrementalLocalKeyedStateHandle)rawStateHandle;
                    stateMetaInfoSnapshots = this.readMetaData(localKeyedStateHandle.getMetaDataState());
                    columnFamilyDescriptors = this.createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
                } else {
                    throw new IllegalStateException("Unexpected state handle type, expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class + ", but found " + rawStateHandle.getClass());
                }
                this.restoreLocalStateIntoFullInstance(localKeyedStateHandle, columnFamilyDescriptors, stateMetaInfoSnapshots);
            }
            finally {
                FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem();
                if (restoreFileSystem.exists(temporaryRestoreInstancePath)) {
                    restoreFileSystem.delete(temporaryRestoreInstancePath, true);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
            this.initTargetDB(restoreStateHandles, ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupRange);
            byte[] startKeyGroupPrefixBytes = new byte[((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes];
            RocksDBKeySerializationUtils.serializeKeyGroup(this.stateBackend.getKeyGroupRange().getStartKeyGroup(), startKeyGroupPrefixBytes);
            byte[] stopKeyGroupPrefixBytes = new byte[((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes];
            RocksDBKeySerializationUtils.serializeKeyGroup(this.stateBackend.getKeyGroupRange().getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
            for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
                if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) {
                    throw new IllegalStateException("Unexpected state handle type, expected " + IncrementalKeyedStateHandle.class + ", but found " + rawStateHandle.getClass());
                }
                Path temporaryRestoreInstancePath = new Path(((RocksDBKeyedStateBackend)this.stateBackend).instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
                try {
                    RestoredDBInstance tmpRestoreDBInfo = this.restoreDBInstanceFromStateHandle((IncrementalKeyedStateHandle)rawStateHandle, temporaryRestoreInstancePath);
                    Throwable throwable = null;
                    try {
                        RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.stateBackend.db);
                        Throwable throwable2 = null;
                        try {
                            List tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
                            List tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
                            for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
                                ColumnFamilyHandle tmpColumnFamilyHandle = (ColumnFamilyHandle)tmpColumnFamilyHandles.get(i);
                                ColumnFamilyDescriptor tmpColumnFamilyDescriptor = (ColumnFamilyDescriptor)tmpColumnFamilyDescriptors.get(i);
                                ColumnFamilyHandle targetColumnFamilyHandle = this.getOrRegisterColumnFamilyHandle(tmpColumnFamilyDescriptor, null, (StateMetaInfoSnapshot)tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i));
                                try (RocksIteratorWrapper iterator2 = RocksDBKeyedStateBackend.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle);){
                                    iterator2.seek(startKeyGroupPrefixBytes);
                                    while (iterator2.isValid() && RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator2.key(), stopKeyGroupPrefixBytes)) {
                                        writeBatchWrapper.put(targetColumnFamilyHandle, iterator2.key(), iterator2.value());
                                        iterator2.next();
                                    }
                                    continue;
                                }
                            }
                        }
                        catch (Throwable throwable3) {
                            throwable2 = throwable3;
                            throw throwable3;
                        }
                        finally {
                            if (writeBatchWrapper == null) continue;
                            if (throwable2 != null) {
                                try {
                                    writeBatchWrapper.close();
                                }
                                catch (Throwable throwable4) {
                                    throwable2.addSuppressed(throwable4);
                                }
                                continue;
                            }
                            writeBatchWrapper.close();
                        }
                    }
                    catch (Throwable throwable5) {
                        throwable = throwable5;
                        throw throwable5;
                    }
                    finally {
                        if (tmpRestoreDBInfo == null) continue;
                        if (throwable != null) {
                            try {
                                tmpRestoreDBInfo.close();
                            }
                            catch (Throwable throwable6) {
                                throwable.addSuppressed(throwable6);
                            }
                            continue;
                        }
                        tmpRestoreDBInfo.close();
                    }
                }
                finally {
                    FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem();
                    if (!restoreFileSystem.exists(temporaryRestoreInstancePath)) continue;
                    restoreFileSystem.delete(temporaryRestoreInstancePath, true);
                }
            }
        }

        private RestoredDBInstance restoreDBInstanceFromStateHandle(IncrementalKeyedStateHandle restoreStateHandle, Path temporaryRestoreInstancePath) throws Exception {
            this.transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath);
            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = this.readMetaData(restoreStateHandle.getMetaStateHandle());
            List<ColumnFamilyDescriptor> columnFamilyDescriptors = this.createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
            ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<ColumnFamilyHandle>(stateMetaInfoSnapshots.size() + 1);
            RocksDB restoreDb = ((RocksDBKeyedStateBackend)this.stateBackend).openDB(temporaryRestoreInstancePath.getPath(), columnFamilyDescriptors, columnFamilyHandles);
            return new RestoredDBInstance(restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots);
        }

        private ColumnFamilyHandle getOrRegisterColumnFamilyHandle(ColumnFamilyDescriptor columnFamilyDescriptor, ColumnFamilyHandle columnFamilyHandle, StateMetaInfoSnapshot stateMetaInfoSnapshot) throws RocksDBException {
            Tuple2 registeredStateMetaInfoEntry = (Tuple2)((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.get(stateMetaInfoSnapshot.getName());
            if (null == registeredStateMetaInfoEntry) {
                RegisteredStateMetaInfoBase stateMetaInfo = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot((StateMetaInfoSnapshot)stateMetaInfoSnapshot);
                registeredStateMetaInfoEntry = new Tuple2((Object)(columnFamilyHandle != null ? columnFamilyHandle : this.stateBackend.db.createColumnFamily(columnFamilyDescriptor)), (Object)stateMetaInfo);
                ((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.put(stateMetaInfoSnapshot.getName(), registeredStateMetaInfoEntry);
            }
            return (ColumnFamilyHandle)registeredStateMetaInfoEntry.f0;
        }

        private void initTargetDB(Collection<KeyedStateHandle> restoreStateHandles, KeyGroupRange targetKeyGroupRange) throws Exception {
            IncrementalKeyedStateHandle initialHandle = (IncrementalKeyedStateHandle)RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(restoreStateHandles, targetKeyGroupRange);
            if (initialHandle != null) {
                restoreStateHandles.remove(initialHandle);
                RestoredDBInstance restoreDBInfo = null;
                Path instancePath = new Path(((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.getAbsolutePath());
                try {
                    restoreDBInfo = this.restoreDBInstanceFromStateHandle(initialHandle, instancePath);
                    RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(restoreDBInfo.db, restoreDBInfo.columnFamilyHandles, targetKeyGroupRange, initialHandle.getKeyGroupRange(), ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes);
                    this.stateBackend.db = restoreDBInfo.db;
                    ((RocksDBKeyedStateBackend)this.stateBackend).defaultColumnFamily = restoreDBInfo.defaultColumnFamilyHandle;
                    ((RocksDBKeyedStateBackend)this.stateBackend).writeBatchWrapper = new RocksDBWriteBatchWrapper(this.stateBackend.db, ((RocksDBKeyedStateBackend)this.stateBackend).writeOptions);
                    for (int i = 0; i < restoreDBInfo.stateMetaInfoSnapshots.size(); ++i) {
                        this.getOrRegisterColumnFamilyHandle((ColumnFamilyDescriptor)restoreDBInfo.columnFamilyDescriptors.get(i), (ColumnFamilyHandle)restoreDBInfo.columnFamilyHandles.get(i), (StateMetaInfoSnapshot)restoreDBInfo.stateMetaInfoSnapshots.get(i));
                    }
                }
                catch (Exception e) {
                    FileSystem restoreFileSystem;
                    if (restoreDBInfo != null) {
                        restoreDBInfo.close();
                    }
                    if ((restoreFileSystem = instancePath.getFileSystem()).exists(instancePath)) {
                        restoreFileSystem.delete(instancePath, true);
                    }
                    throw e;
                }
            } else {
                ArrayList columnFamilyHandles = new ArrayList(1);
                this.stateBackend.db = ((RocksDBKeyedStateBackend)this.stateBackend).openDB(((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
                ((RocksDBKeyedStateBackend)this.stateBackend).defaultColumnFamily = (ColumnFamilyHandle)columnFamilyHandles.get(0);
                ((RocksDBKeyedStateBackend)this.stateBackend).writeBatchWrapper = new RocksDBWriteBatchWrapper(this.stateBackend.db, ((RocksDBKeyedStateBackend)this.stateBackend).writeOptions);
            }
        }

        private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
            ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(stateMetaInfoSnapshots.size());
            for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
                ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), ((RocksDBKeyedStateBackend)this.stateBackend).columnOptions);
                columnFamilyDescriptors.add(columnFamilyDescriptor);
                ((RocksDBKeyedStateBackend)this.stateBackend).restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot);
            }
            return columnFamilyDescriptors;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void restoreLocalStateIntoFullInstance(IncrementalLocalKeyedStateHandle restoreStateHandle, List<ColumnFamilyDescriptor> columnFamilyDescriptors, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) throws Exception {
            ((RocksDBKeyedStateBackend)this.stateBackend).backendUID = restoreStateHandle.getBackendIdentifier();
            LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", (Object)((RocksDBKeyedStateBackend)this.stateBackend).operatorIdentifier, (Object)((RocksDBKeyedStateBackend)this.stateBackend).backendUID);
            if (!((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.mkdirs()) {
                throw new IOException("Could not create RocksDB data directory.");
            }
            Path restoreSourcePath = restoreStateHandle.getDirectoryStateHandle().getDirectory();
            this.restoreInstanceDirectoryFromPath(restoreSourcePath);
            ArrayList columnFamilyHandles = new ArrayList(1 + columnFamilyDescriptors.size());
            this.stateBackend.db = ((RocksDBKeyedStateBackend)this.stateBackend).openDB(((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
            ((RocksDBKeyedStateBackend)this.stateBackend).defaultColumnFamily = (ColumnFamilyHandle)columnFamilyHandles.remove(0);
            ((RocksDBKeyedStateBackend)this.stateBackend).writeBatchWrapper = new RocksDBWriteBatchWrapper(this.stateBackend.db, ((RocksDBKeyedStateBackend)this.stateBackend).writeOptions);
            for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
                StateMetaInfoSnapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
                ColumnFamilyHandle columnFamilyHandle = (ColumnFamilyHandle)columnFamilyHandles.get(i);
                RegisteredStateMetaInfoBase stateMetaInfo = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot((StateMetaInfoSnapshot)stateMetaInfoSnapshot);
                ((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.put(stateMetaInfoSnapshot.getName(), new Tuple2((Object)columnFamilyHandle, (Object)stateMetaInfo));
            }
            SortedMap sortedMap = ((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles;
            synchronized (sortedMap) {
                ((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles.put(restoreStateHandle.getCheckpointId(), restoreStateHandle.getSharedStateHandleIDs());
            }
            ((RocksDBKeyedStateBackend)this.stateBackend).lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
        }

        private void restoreInstanceDirectoryFromPath(Path source2) throws IOException {
            FileSystem fileSystem = source2.getFileSystem();
            FileStatus[] fileStatuses = fileSystem.listStatus(source2);
            if (fileStatuses == null) {
                throw new IOException("Cannot list file statues. Directory " + source2 + " does not exist.");
            }
            for (FileStatus fileStatus : fileStatuses) {
                Path filePath = fileStatus.getPath();
                String fileName = filePath.getName();
                File restoreFile = new File(source2.getPath(), fileName);
                File targetFile = new File(((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.getPath(), fileName);
                if (fileName.endsWith(RocksDBKeyedStateBackend.SST_FILE_SUFFIX)) {
                    Files.createLink(targetFile.toPath(), restoreFile.toPath());
                    continue;
                }
                Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<StateMetaInfoSnapshot> readMetaData(StreamStateHandle metaStateHandle) throws Exception {
            FSDataInputStream inputStream = null;
            try {
                inputStream = metaStateHandle.openInputStream();
                ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable((Closeable)inputStream);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(((RocksDBKeyedStateBackend)this.stateBackend).userCodeClassLoader, false);
                DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)inputStream);
                serializationProxy.read((DataInputView)in);
                if (CompatibilityUtil.resolveCompatibilityResult((TypeSerializer)serializationProxy.getKeySerializer(), UnloadableDummyTypeSerializer.class, (TypeSerializerConfigSnapshot)serializationProxy.getKeySerializerConfigSnapshot(), (TypeSerializer)((RocksDBKeyedStateBackend)this.stateBackend).keySerializer).isRequiresMigration()) {
                    throw new StateMigrationException("The new key serializer is not compatible to read previous keys. Aborting now since state migration is currently not available");
                }
                List list2 = serializationProxy.getStateMetaInfoSnapshots();
                return list2;
            }
            finally {
                if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable((Closeable)inputStream)) {
                    inputStream.close();
                }
            }
        }

        private void transferAllStateDataToDirectory(IncrementalKeyedStateHandle restoreStateHandle, Path dest) throws IOException {
            Map sstFiles = restoreStateHandle.getSharedState();
            Map miscFiles = restoreStateHandle.getPrivateState();
            this.transferAllDataFromStateHandles(sstFiles, dest);
            this.transferAllDataFromStateHandles(miscFiles, dest);
        }

        private void transferAllDataFromStateHandles(Map<StateHandleID, StreamStateHandle> stateHandleMap, Path restoreInstancePath) throws IOException {
            for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
                StateHandleID stateHandleID = entry.getKey();
                StreamStateHandle remoteFileHandle = entry.getValue();
                this.copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void copyStateDataHandleData(Path restoreFilePath, StreamStateHandle remoteFileHandle) throws IOException {
            FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
            FSDataInputStream inputStream = null;
            FSDataOutputStream outputStream = null;
            try {
                int numBytes;
                inputStream = remoteFileHandle.openInputStream();
                ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable((Closeable)inputStream);
                outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
                ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable((Closeable)outputStream);
                byte[] buffer = new byte[8192];
                while ((numBytes = inputStream.read(buffer)) != -1) {
                    outputStream.write(buffer, 0, numBytes);
                }
            }
            finally {
                if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable((Closeable)inputStream)) {
                    inputStream.close();
                }
                if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable((Closeable)outputStream)) {
                    outputStream.close();
                }
            }
        }

        private class RestoredDBInstance
        implements AutoCloseable {
            @Nonnull
            private final RocksDB db;
            @Nonnull
            private final ColumnFamilyHandle defaultColumnFamilyHandle;
            @Nonnull
            private final List<ColumnFamilyHandle> columnFamilyHandles;
            @Nonnull
            private final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
            @Nonnull
            private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;

            RestoredDBInstance(@Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
                this.db = db;
                this.columnFamilyHandles = columnFamilyHandles;
                this.defaultColumnFamilyHandle = this.columnFamilyHandles.remove(0);
                this.columnFamilyDescriptors = columnFamilyDescriptors;
                this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
            }

            @Override
            public void close() {
                IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamilyHandle);
                for (ColumnFamilyHandle columnFamilyHandle : this.columnFamilyHandles) {
                    IOUtils.closeQuietly((AutoCloseable)columnFamilyHandle);
                }
                IOUtils.closeQuietly((AutoCloseable)this.db);
            }
        }
    }

    private static final class RocksDBFullRestoreOperation<K> {
        private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
        private KeyGroupsStateHandle currentKeyGroupsStateHandle;
        private FSDataInputStream currentStateHandleInStream;
        private DataInputView currentStateHandleInView;
        private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
        private StreamCompressionDecorator keygroupStreamCompressionDecorator;

        public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
            this.rocksDBKeyedStateBackend = (RocksDBKeyedStateBackend)((Object)Preconditions.checkNotNull(rocksDBKeyedStateBackend));
        }

        public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) throws IOException, StateMigrationException, RocksDBException {
            ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).createDB();
            for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
                if (keyedStateHandle == null) continue;
                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                    throw new IllegalStateException("Unexpected state handle type, expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass());
                }
                this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
                this.restoreKeyGroupsInStateHandle();
            }
        }

        private void restoreKeyGroupsInStateHandle() throws IOException, StateMigrationException, RocksDBException {
            try {
                this.currentStateHandleInStream = this.currentKeyGroupsStateHandle.openInputStream();
                ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).cancelStreamRegistry.registerCloseable((Closeable)this.currentStateHandleInStream);
                this.currentStateHandleInView = new DataInputViewStreamWrapper((InputStream)this.currentStateHandleInStream);
                this.restoreKVStateMetaData();
                this.restoreKVStateData();
            }
            finally {
                if (((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).cancelStreamRegistry.unregisterCloseable((Closeable)this.currentStateHandleInStream)) {
                    IOUtils.closeQuietly((AutoCloseable)this.currentStateHandleInStream);
                }
            }
        }

        private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
            KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).userCodeClassLoader, false);
            serializationProxy.read(this.currentStateHandleInView);
            if (CompatibilityUtil.resolveCompatibilityResult((TypeSerializer)serializationProxy.getKeySerializer(), UnloadableDummyTypeSerializer.class, (TypeSerializerConfigSnapshot)serializationProxy.getKeySerializerConfigSnapshot(), (TypeSerializer)((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).keySerializer).isRequiresMigration()) {
                throw new StateMigrationException("The new key serializer is not compatible to read previous keys. Aborting now since state migration is currently not available");
            }
            this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
            List restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots();
            this.currentStateHandleKVStateColumnFamilies = new ArrayList<ColumnFamilyHandle>(restoredMetaInfos.size());
            for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
                Tuple2 registeredColumn = (Tuple2)((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).kvStateInformation.get(restoredMetaInfo.getName());
                if (registeredColumn == null) {
                    byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
                    ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(nameBytes, ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).columnOptions);
                    RegisteredStateMetaInfoBase stateMetaInfo = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot((StateMetaInfoSnapshot)restoredMetaInfo);
                    ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
                    ColumnFamilyHandle columnFamily = this.rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
                    registeredColumn = new Tuple2((Object)columnFamily, (Object)stateMetaInfo);
                    ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).kvStateInformation.put(stateMetaInfo.getName(), registeredColumn);
                }
                this.currentStateHandleKVStateColumnFamilies.add((ColumnFamilyHandle)registeredColumn.f0);
            }
        }

        private void restoreKVStateData() throws IOException, RocksDBException {
            try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.rocksDBKeyedStateBackend.db);){
                for (Tuple2 keyGroupOffset : this.currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
                    int keyGroup = (Integer)keyGroupOffset.f0;
                    Preconditions.checkState((boolean)this.rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), (Object)"The key group must belong to the backend");
                    long offset = (Long)keyGroupOffset.f1;
                    if (0L == offset) continue;
                    this.currentStateHandleInStream.seek(offset);
                    InputStream compressedKgIn = this.keygroupStreamCompressionDecorator.decorateWithCompression((InputStream)this.currentStateHandleInStream);
                    Throwable throwable = null;
                    try {
                        DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
                        int kvStateId = compressedKgInputView.readShort();
                        ColumnFamilyHandle handle2 = this.currentStateHandleKVStateColumnFamilies.get(kvStateId);
                        boolean keyGroupHasMoreKeys = true;
                        while (keyGroupHasMoreKeys) {
                            byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize((DataInputView)compressedKgInputView);
                            byte[] value2 = BytePrimitiveArraySerializer.INSTANCE.deserialize((DataInputView)compressedKgInputView);
                            if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
                                RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
                                writeBatchWrapper.put(handle2, key, value2);
                                kvStateId = 0xFFFF & compressedKgInputView.readShort();
                                if (65535 == kvStateId) {
                                    keyGroupHasMoreKeys = false;
                                    continue;
                                }
                                handle2 = this.currentStateHandleKVStateColumnFamilies.get(kvStateId);
                                continue;
                            }
                            writeBatchWrapper.put(handle2, key, value2);
                        }
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (compressedKgIn == null) continue;
                        if (throwable != null) {
                            try {
                                compressedKgIn.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        compressedKgIn.close();
                    }
                }
            }
        }
    }

    private static interface StateFactory {
        public <K, N, SV, S extends State, IS extends S> IS createState(StateDescriptor<S, SV> var1, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> var2, RocksDBKeyedStateBackend<K> var3) throws Exception;
    }
}

