/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.contrib.streaming.state.OptionsFactory;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBStateBackend
extends AbstractStateBackend
implements ConfigurableStateBackend {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
    private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
    private static boolean rocksDbInitialized = false;
    private final StateBackend checkpointStreamBackend;
    @Nullable
    private File[] localRocksDbDirectories;
    private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
    @Nullable
    private OptionsFactory optionsFactory;
    private final TernaryBoolean enableIncrementalCheckpointing;
    private final PriorityQueueStateType priorityQueueStateType;
    private transient File[] initializedDbBasePaths;
    private transient JobID jobId;
    private transient int nextDirectory;
    private transient boolean isInitialized;

    public RocksDBStateBackend(String checkpointDataUri) throws IOException {
        this(new Path(checkpointDataUri).toUri());
    }

    public RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException {
        this(new Path(checkpointDataUri).toUri(), enableIncrementalCheckpointing);
    }

    public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
        this((AbstractStateBackend)new FsStateBackend(checkpointDataUri));
    }

    public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException {
        this((AbstractStateBackend)new FsStateBackend(checkpointDataUri), enableIncrementalCheckpointing);
    }

    public RocksDBStateBackend(StateBackend checkpointStreamBackend) {
        this(checkpointStreamBackend, TernaryBoolean.UNDEFINED);
    }

    public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) {
        this.checkpointStreamBackend = (StateBackend)Preconditions.checkNotNull((Object)checkpointStreamBackend);
        this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
        this.priorityQueueStateType = PriorityQueueStateType.HEAP;
    }

    @Deprecated
    public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
        this((StateBackend)checkpointStreamBackend, TernaryBoolean.UNDEFINED);
    }

    @Deprecated
    public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
        this((StateBackend)checkpointStreamBackend, TernaryBoolean.fromBoolean((boolean)enableIncrementalCheckpointing));
    }

    private RocksDBStateBackend(RocksDBStateBackend original, Configuration config) {
        StateBackend originalStreamBackend = original.checkpointStreamBackend;
        this.checkpointStreamBackend = originalStreamBackend instanceof ConfigurableStateBackend ? ((ConfigurableStateBackend)originalStreamBackend).configure(config) : originalStreamBackend;
        this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
        String priorityQueueTypeString = config.getString(RocksDBOptions.TIMER_SERVICE_FACTORY);
        PriorityQueueStateType priorityQueueStateType = this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ? PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
        if (original.localRocksDbDirectories != null) {
            this.localRocksDbDirectories = original.localRocksDbDirectories;
        } else {
            String rocksdbLocalPaths = config.getString(RocksDBOptions.LOCAL_DIRECTORIES);
            if (rocksdbLocalPaths != null) {
                String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);
                try {
                    this.setDbStoragePaths(directories);
                }
                catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException("Invalid configuration for RocksDB state backend's local storage directories: " + e.getMessage(), (Throwable)e);
                }
            }
        }
        this.predefinedOptions = original.predefinedOptions;
        this.optionsFactory = original.optionsFactory;
    }

    public RocksDBStateBackend configure(Configuration config) {
        return new RocksDBStateBackend(this, config);
    }

    public StateBackend getCheckpointBackend() {
        return this.checkpointStreamBackend;
    }

    private void lazyInitializeForJob(Environment env, String operatorIdentifier) throws IOException {
        if (this.isInitialized) {
            return;
        }
        this.jobId = env.getJobID();
        if (this.localRocksDbDirectories == null) {
            this.initializedDbBasePaths = env.getIOManager().getSpillingDirectories();
        } else {
            ArrayList<File> dirs2 = new ArrayList<File>(this.localRocksDbDirectories.length);
            StringBuilder errorMessage2 = new StringBuilder();
            for (File f : this.localRocksDbDirectories) {
                File testDir = new File(f, UUID.randomUUID().toString());
                if (!testDir.mkdirs()) {
                    String msg = "Local DB files directory '" + f + "' does not exist and cannot be created. ";
                    LOG.error(msg);
                    errorMessage2.append(msg);
                } else {
                    dirs2.add(f);
                }
                testDir.delete();
            }
            if (dirs2.isEmpty()) {
                throw new IOException("No local storage directories available. " + errorMessage2);
            }
            this.initializedDbBasePaths = dirs2.toArray(new File[dirs2.size()]);
        }
        this.nextDirectory = new Random().nextInt(this.initializedDbBasePaths.length);
        this.isInitialized = true;
    }

    private File getNextStoragePath() {
        int ni = this.nextDirectory + 1;
        this.nextDirectory = ni = ni >= this.initializedDbBasePaths.length ? 0 : ni;
        return this.initializedDbBasePaths[ni];
    }

    public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {
        return this.checkpointStreamBackend.resolveCheckpoint(pointer);
    }

    public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
        return this.checkpointStreamBackend.createCheckpointStorage(jobId);
    }

    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider) throws IOException {
        String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
        this.ensureRocksDBIsLoaded(tempDir);
        String fileCompatibleIdentifier = operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_");
        this.lazyInitializeForJob(env, fileCompatibleIdentifier);
        File instanceBasePath = new File(this.getNextStoragePath(), "job_" + this.jobId + "_op_" + fileCompatibleIdentifier + "_uuid_" + UUID.randomUUID());
        LocalRecoveryConfig localRecoveryConfig = env.getTaskStateManager().createLocalRecoveryConfig();
        return new RocksDBKeyedStateBackend<K>(operatorIdentifier, env.getUserClassLoader(), instanceBasePath, this.getDbOptions(), this.getColumnOptions(), kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange, env.getExecutionConfig(), this.isIncrementalCheckpointsEnabled(), localRecoveryConfig, this.priorityQueueStateType, ttlTimeProvider);
    }

    public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
        boolean asyncSnapshots = true;
        return new DefaultOperatorStateBackend(env.getUserClassLoader(), env.getExecutionConfig(), true);
    }

    public void setDbStoragePath(String path) {
        String[] stringArray;
        if (path == null) {
            stringArray = null;
        } else {
            String[] stringArray2 = new String[1];
            stringArray = stringArray2;
            stringArray2[0] = path;
        }
        this.setDbStoragePaths(stringArray);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void setDbStoragePaths(String ... paths) {
        if (paths == null) {
            this.localRocksDbDirectories = null;
            return;
        }
        if (paths.length == 0) {
            throw new IllegalArgumentException("empty paths");
        }
        File[] pp = new File[paths.length];
        for (int i = 0; i < paths.length; ++i) {
            String path;
            String rawPath = paths[i];
            if (rawPath == null) {
                throw new IllegalArgumentException("null path");
            }
            URI uri = null;
            try {
                uri = new Path(rawPath).toUri();
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (uri != null && uri.getScheme() != null) {
                if (!"file".equalsIgnoreCase(uri.getScheme())) throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
                path = uri.getPath();
            } else {
                path = rawPath;
            }
            pp[i] = new File(path);
            if (pp[i].isAbsolute()) continue;
            throw new IllegalArgumentException("Relative paths are not supported");
        }
        this.localRocksDbDirectories = pp;
    }

    public String[] getDbStoragePaths() {
        if (this.localRocksDbDirectories == null) {
            return null;
        }
        String[] paths = new String[this.localRocksDbDirectories.length];
        for (int i = 0; i < paths.length; ++i) {
            paths[i] = this.localRocksDbDirectories[i].toString();
        }
        return paths;
    }

    public boolean isIncrementalCheckpointsEnabled() {
        return this.enableIncrementalCheckpointing.getOrDefault(((Boolean)CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue()).booleanValue());
    }

    public void setPredefinedOptions(PredefinedOptions options) {
        this.predefinedOptions = (PredefinedOptions)((Object)Preconditions.checkNotNull((Object)((Object)options)));
    }

    public PredefinedOptions getPredefinedOptions() {
        return this.predefinedOptions;
    }

    public void setOptions(OptionsFactory optionsFactory) {
        this.optionsFactory = optionsFactory;
    }

    public OptionsFactory getOptions() {
        return this.optionsFactory;
    }

    public DBOptions getDbOptions() {
        DBOptions opt2 = this.predefinedOptions.createDBOptions();
        if (this.optionsFactory != null) {
            opt2 = this.optionsFactory.createDBOptions(opt2);
        }
        opt2 = opt2.setCreateIfMissing(true);
        return opt2;
    }

    public ColumnFamilyOptions getColumnOptions() {
        ColumnFamilyOptions opt2 = this.predefinedOptions.createColumnOptions();
        if (this.optionsFactory != null) {
            opt2 = this.optionsFactory.createColumnOptions(opt2);
        }
        return opt2;
    }

    public String toString() {
        return "RocksDBStateBackend{checkpointStreamBackend=" + this.checkpointStreamBackend + ", localRocksDbDirectories=" + Arrays.toString(this.localRocksDbDirectories) + ", enableIncrementalCheckpointing=" + this.enableIncrementalCheckpointing + '}';
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensureRocksDBIsLoaded(String tempDirectory) throws IOException {
        Class<RocksDBStateBackend> clazz = RocksDBStateBackend.class;
        synchronized (RocksDBStateBackend.class) {
            if (!rocksDbInitialized) {
                File tempDirParent = new File(tempDirectory).getAbsoluteFile();
                LOG.info("Attempting to load RocksDB native library and store it under '{}'", (Object)tempDirParent);
                Throwable lastException = null;
                for (int attempt2 = 1; attempt2 <= 3; ++attempt2) {
                    try {
                        File rocksLibFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID());
                        LOG.debug("Attempting to create RocksDB native library folder {}", (Object)rocksLibFolder);
                        rocksLibFolder.mkdirs();
                        NativeLibraryLoader.getInstance().loadLibrary(rocksLibFolder.getAbsolutePath());
                        RocksDB.loadLibrary();
                        LOG.info("Successfully loaded RocksDB native library");
                        rocksDbInitialized = true;
                        // ** MonitorExit[var2_2] (shouldn't be in output)
                        return;
                    }
                    catch (Throwable t) {
                        lastException = t;
                        LOG.debug("RocksDB JNI library loading attempt {} failed", (Object)attempt2, (Object)t);
                        try {
                            RocksDBStateBackend.resetRocksDBLoadedFlag();
                        }
                        catch (Throwable tt) {
                            LOG.debug("Failed to reset 'initialized' flag in RocksDB native code loader", tt);
                        }
                        continue;
                    }
                }
                throw new IOException("Could not load the native RocksDB library", lastException);
            }
            // ** MonitorExit[var2_2] (shouldn't be in output)
            return;
        }
    }

    @VisibleForTesting
    static void resetRocksDBLoadedFlag() throws Exception {
        Field initField = NativeLibraryLoader.class.getDeclaredField("initialized");
        initField.setAccessible(true);
        initField.setBoolean(null, false);
    }

    public static enum PriorityQueueStateType {
        HEAP,
        ROCKSDB;

    }
}

