package com._4paradigm.openmldb.synctool;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.PrintStream;
import java.lang.invoke.SerializedLambda;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/_4paradigm/openmldb/synctool/FlinkTunnel.class */
public class FlinkTunnel {
    private static final Logger log = LoggerFactory.getLogger(FlinkTunnel.class);
    private static FlinkTunnel instance;
    private MiniCluster miniCluster;
    private URI miniClusterAddr;
    private StreamExecutionEnvironment env;
    private Map<Integer, JobID> sourceMap = new ConcurrentHashMap();

    /* loaded from: input_file:com/_4paradigm/openmldb/synctool/FlinkTunnel$KeyBucketAssigner.class */
    public static final class KeyBucketAssigner implements BucketAssigner<String, String> {
        private static final long serialVersionUID = 987325769970523326L;

        public String getBucketId(String str, BucketAssigner.Context context) {
            return "0";
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }

    public static synchronized FlinkTunnel getInstance() {
        if (instance == null) {
            instance = new FlinkTunnel();
            try {
                SyncToolConfig.parse();
                instance.init(SyncToolConfig.getProp());
            } catch (Exception e) {
                e.printStackTrace();
                instance = null;
            }
        }
        return instance;
    }

    private FlinkTunnel() {
    }

    private void init(Properties properties) throws Exception {
        Configuration fromMap = Configuration.fromMap(Maps.fromProperties(properties));
        fromMap.setString("execution.runtime-mode", "STREAMING");
        fromMap.setString("high-availability", "zookeeper");
        fromMap.setString("high-availability.zookeeper.quorum", SyncToolConfig.ZK_CLUSTER);
        this.miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(fromMap).setNumTaskManagers(1).setNumSlotsPerTaskManager(SyncToolConfig.FLINK_SLOTS).build());
        this.miniCluster.start();
        this.miniClusterAddr = (URI) this.miniCluster.getRestAddress().get();
        log.info("start a mini cluster, addr: {}", this.miniClusterAddr);
        this.env = StreamExecutionEnvironment.createRemoteEnvironment(this.miniClusterAddr.getHost(), this.miniClusterAddr.getPort(), fromMap, new String[0]);
        this.env.setParallelism(1);
        this.env.enableCheckpointing(5000L);
        CheckpointConfig checkpointConfig = this.env.getCheckpointConfig();
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        checkpointConfig.setCheckpointStorage("file:///tmp/flink-checkpoints/");
        for (JobStatusMessage jobStatusMessage : (Collection) this.miniCluster.listJobs().get()) {
            if (!jobStatusMessage.getJobState().isGloballyTerminalState()) {
                log.info("old job: {}", jobStatusMessage);
                int parseInt = Integer.parseInt(jobStatusMessage.getJobName().split("-")[2]);
                Path path = Paths.get(SyncToolImpl.genCacheDir(parseInt), new String[0]);
                if (Files.notExists(path, new LinkOption[0])) {
                    log.warn("cache dir not exists(weird), create it: {}", path);
                    Files.createDirectories(path, new FileAttribute[0]);
                }
                this.sourceMap.put(Integer.valueOf(parseInt), jobStatusMessage.getJobId());
            }
        }
        log.info("recovered jobs: {}", this.sourceMap);
    }

    public synchronized boolean recoverTunnel(int i) {
        return this.sourceMap.containsKey(Integer.valueOf(i));
    }

    public synchronized boolean createTunnel(int i, String str, String str2) {
        if (this.sourceMap.containsKey(Integer.valueOf(i))) {
            log.warn("tunnel for tid: {} already exists", Integer.valueOf(i));
            return false;
        }
        String str3 = "file://" + str;
        log.info("create source for tid: {}, fsURI: {}", Integer.valueOf(i), str3);
        TextInputFormat textInputFormat = new TextInputFormat(new org.apache.flink.core.fs.Path(str3));
        textInputFormat.setFilesFilter(new CsvFilePathFilter());
        this.env.readFile(textInputFormat, str3, FileProcessingMode.PROCESS_CONTINUOUSLY, 100L).setParallelism(1).name("source" + i).sinkTo(FileSink.forRowFormat(new org.apache.flink.core.fs.Path(str2), (str4, outputStream) -> {
            new PrintStream(outputStream).println(str4);
        }).withBucketAssigner(new KeyBucketAssigner()).withRollingPolicy(OnCheckpointRollingPolicy.build()).build());
        try {
            JobClient executeAsync = this.env.executeAsync(genJobName(i));
            this.sourceMap.put(Integer.valueOf(i), executeAsync.getJobID());
            log.info("create job for tid: {} success, job {}", Integer.valueOf(i), executeAsync.getJobID());
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    public JobStatus getJobStatus(int i) {
        JobID jobID = this.sourceMap.get(Integer.valueOf(i));
        if (jobID == null) {
            log.error("tid: {} not exist", Integer.valueOf(i));
            return null;
        }
        try {
            return (JobStatus) this.miniCluster.getJobStatus(jobID).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            return null;
        }
    }

    public void close() {
    }

    public String genJobName(int i) {
        return "stream-tid-" + i;
    }

    public synchronized void closeTunnel(int i) {
        JobID jobID = (JobID) Preconditions.checkNotNull(this.sourceMap.get(Integer.valueOf(i)));
        try {
            URL url = this.miniClusterAddr.resolve("/jobs/" + jobID).toURL();
            log.info("check job: {}", url);
            HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
            httpURLConnection.setRequestMethod("GET");
            log.info(httpURLConnection.getResponseMessage());
        } catch (Exception e) {
        }
        this.miniCluster.cancelJob(jobID);
    }

    public MiniCluster getMiniCluster() {
        return this.miniCluster;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 2014337214:
                if (implMethodName.equals("lambda$createTunnel$6c77c0e9$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/serialization/Encoder") && serializedLambda.getFunctionalInterfaceMethodName().equals("encode") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/io/OutputStream;)V") && serializedLambda.getImplClass().equals("com/_4paradigm/openmldb/synctool/FlinkTunnel") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/io/OutputStream;)V")) {
                    return (str4, outputStream) -> {
                        new PrintStream(outputStream).println(str4);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
