/*
 * Decompiled with CFR 0.152.
 */
package cn.tenmg.clink.cdc.mysql.source;

import cn.tenmg.clink.cdc.mysql.debezium.MultiTableDebeziumDeserializationSchema;
import cn.tenmg.clink.source.SourceFactory;
import cn.tenmg.clink.utils.ConfigurationUtils;
import cn.tenmg.dsl.utils.MapUtils;
import cn.tenmg.dsl.utils.StringUtils;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.connectors.mysql.source.config.ServerIdRange;
import com.ververica.cdc.connectors.mysql.table.StartupMode;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import com.ververica.cdc.debezium.table.MetadataConverter;
import io.debezium.data.Envelope;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;

public class MySqlCdcSourceFactory
implements SourceFactory<MySqlSource<Tuple2<String, Row>>> {
    public static final String IDENTIFIER = "mysql-cdc";
    private static final String SINGLE_QUOTATION_MARK = "'";
    private static final String JDBC_PROPERTIES_PREFIX = "jdbc.properties.";
    private static final String INCLUDE_SCHEMA_CHANGES = "include-schema-changes";
    private static final String CONVERT_DELETE_TO_UPDATE = "convert-delete-to-update";
    private static final String SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = "scan.incremental.close-idle-reader.enabled";
    private static final String CLOSE_IDLE_READERS_METHOD_NAME = "closeIdleReaders";
    private static final String SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP = "scan.incremental.snapshot.backfill.skip";
    private static final String SKIP_SNAPSHOT_BACKFILL_METHOD_NAME = "skipSnapshotBackfill";
    private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
    private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
    private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
    private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
    private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
    private static final Map<String, MetadataConverter> METADATA_CONVERTERS = MapUtils.newHashMapBuilder(String.class, MetadataConverter.class).put((Object)"table_name", (Object)new MetadataConverter(){
        private static final long serialVersionUID = 1L;

        public Object read(SourceRecord record) {
            Struct messageStruct = (Struct)record.value();
            Struct sourceStruct = messageStruct.getStruct("source");
            return StringData.fromString((String)sourceStruct.getString("table"));
        }
    }).put((Object)"database_name", (Object)new MetadataConverter(){
        private static final long serialVersionUID = 1L;

        public Object read(SourceRecord record) {
            Struct messageStruct = (Struct)record.value();
            Struct sourceStruct = messageStruct.getStruct("source");
            return StringData.fromString((String)sourceStruct.getString("db"));
        }
    }).put((Object)"op_ts", (Object)new MetadataConverter(){
        private static final long serialVersionUID = 1L;

        public Object read(SourceRecord record) {
            Struct messageStruct = (Struct)record.value();
            Struct sourceStruct = messageStruct.getStruct("source");
            return TimestampData.fromEpochMillis((long)((Long)sourceStruct.get("ts_ms")));
        }
    }).put((Object)"op", (Object)new MetadataConverter(){
        private static final long serialVersionUID = 1L;

        public Object read(SourceRecord record) {
            Envelope.Operation op = Envelope.operationFor((SourceRecord)record);
            if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
                return StringData.fromString((String)"c");
            }
            if (op == Envelope.Operation.DELETE) {
                return StringData.fromString((String)"d");
            }
            return StringData.fromString((String)"u");
        }
    }).build();

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public MySqlSource<Tuple2<String, Row>> create(Map<String, String> config, Map<String, RowType> rowTypes, Map<String, Map<Integer, String>> metadatas) {
        Set<String> tables;
        String databaseName = MySqlCdcSourceFactory.getOrDefault(config, (ConfigOption<String>)MySqlSourceOptions.DATABASE_NAME);
        HashSet<String> databases = new HashSet<String>();
        if (StringUtils.isBlank((String)databaseName)) {
            tables = rowTypes.keySet();
            for (String tableName : tables) {
                String[] parts = tableName.split("\\.", 2);
                if (parts.length <= 1) continue;
                databases.add(parts[0]);
            }
        } else {
            if (databaseName.startsWith(SINGLE_QUOTATION_MARK) && databaseName.endsWith(SINGLE_QUOTATION_MARK)) {
                databaseName = databaseName.substring(1, databaseName.length() - 1);
            }
            databases.add(databaseName);
            tables = new HashSet<String>();
            for (String tableName : rowTypes.keySet()) {
                String[] parts = tableName.split("\\.", 2);
                if (parts.length > 1) {
                    databases.add(parts[0]);
                    tables.add(tableName);
                    continue;
                }
                tables.add(StringUtils.concat((String[])new String[]{databaseName, ".", tableName}));
            }
        }
        MySqlSourceBuilder builder = MySqlSource.builder().hostname(config.get(MySqlSourceOptions.HOSTNAME.key())).port(MySqlCdcSourceFactory.getIntegerOrDefault(config, (ConfigOption<Integer>)MySqlSourceOptions.PORT).intValue()).username(config.get(MySqlSourceOptions.USERNAME.key())).password(config.get(MySqlSourceOptions.PASSWORD.key())).databaseList(MySqlCdcSourceFactory.toArray(databases)).tableList(MySqlCdcSourceFactory.toArray(tables)).serverId(this.validateAndGetServerId(config)).debeziumProperties(DebeziumOptions.getDebeziumProperties(config)).jdbcProperties(ConfigurationUtils.getPrefixedKeyValuePairs(config, (String)JDBC_PROPERTIES_PREFIX, (boolean)false));
        StartupOptions startupOptions = MySqlCdcSourceFactory.getStartupOptions(config);
        builder.startupOptions(startupOptions);
        if (config.containsKey(MySqlSourceOptions.SERVER_TIME_ZONE.key())) {
            builder.serverTimeZone(config.get(MySqlSourceOptions.SERVER_TIME_ZONE.key()));
        }
        this.validateStartupOption(startupOptions);
        int splitSize = MySqlCdcSourceFactory.getIntegerOrDefault(config, (ConfigOption<Integer>)MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
        this.validateIntegerOption((ConfigOption<Integer>)MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
        builder.splitSize(splitSize);
        int fetchSize = MySqlCdcSourceFactory.getIntegerOrDefault(config, (ConfigOption<Integer>)MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE);
        this.validateIntegerOption((ConfigOption<Integer>)MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
        builder.fetchSize(fetchSize);
        int splitMetaGroupSize = MySqlCdcSourceFactory.getIntegerOrDefault(config, (ConfigOption<Integer>)MySqlSourceOptions.CHUNK_META_GROUP_SIZE);
        this.validateIntegerOption((ConfigOption<Integer>)MySqlSourceOptions.CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
        builder.splitMetaGroupSize(splitMetaGroupSize);
        int connectionPoolSize = MySqlCdcSourceFactory.getIntegerOrDefault(config, (ConfigOption<Integer>)MySqlSourceOptions.CONNECTION_POOL_SIZE);
        this.validateIntegerOption((ConfigOption<Integer>)MySqlSourceOptions.CONNECTION_POOL_SIZE, connectionPoolSize, 1);
        builder.connectionPoolSize(connectionPoolSize);
        int connectMaxRetries = MySqlCdcSourceFactory.getIntegerOrDefault(config, (ConfigOption<Integer>)MySqlSourceOptions.CONNECT_MAX_RETRIES);
        this.validateIntegerOption((ConfigOption<Integer>)MySqlSourceOptions.CONNECT_MAX_RETRIES, connectMaxRetries, 0);
        builder.connectMaxRetries(connectMaxRetries);
        builder.distributionFactorLower(Double.parseDouble(MySqlCdcSourceFactory.getOrDefault(config, Arrays.asList("chunk-key.even-distribution.factor.lower-bound", "split-key.even-distribution.factor.lower-bound"), "0.05")));
        builder.distributionFactorUpper(Double.parseDouble(MySqlCdcSourceFactory.getOrDefault(config, Arrays.asList("chunk-key.even-distribution.factor.upper-bound", "split-key.even-distribution.factor.upper-bound"), "1000")));
        builder.connectTimeout(MySqlCdcSourceFactory.getDurationOrDefault(config, (ConfigOption<Duration>)MySqlSourceOptions.CONNECT_TIMEOUT));
        builder.scanNewlyAddedTableEnabled(MySqlCdcSourceFactory.getBooleanOrDefault(config, (ConfigOption<Boolean>)MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED).booleanValue());
        builder.heartbeatInterval(MySqlCdcSourceFactory.getDurationOrDefault(config, (ConfigOption<Duration>)MySqlSourceOptions.HEARTBEAT_INTERVAL));
        if (config.containsKey(INCLUDE_SCHEMA_CHANGES)) {
            builder.includeSchemaChanges(Boolean.valueOf(config.get(INCLUDE_SCHEMA_CHANGES)).booleanValue());
        }
        if (config.containsKey(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED)) {
            this.setbooleanOptionIfPossible(builder, CLOSE_IDLE_READERS_METHOD_NAME, Boolean.valueOf(config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED)));
        }
        if (config.containsKey(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP)) {
            this.setbooleanOptionIfPossible(builder, SKIP_SNAPSHOT_BACKFILL_METHOD_NAME, Boolean.valueOf(config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP)));
        }
        String convertDeleteToUpdate = config.get(CONVERT_DELETE_TO_UPDATE);
        return builder.deserializer((DebeziumDeserializationSchema)new MultiTableDebeziumDeserializationSchema(rowTypes, this.toMetadataConverters(metadatas), convertDeleteToUpdate == null ? false : Boolean.parseBoolean(convertDeleteToUpdate))).build();
    }

    private static String[] toArray(Set<String> strs) {
        return strs.toArray(new String[strs.size()]);
    }

    private static String getOrDefault(Map<String, String> config, ConfigOption<String> option) {
        if (config.containsKey(option.key())) {
            return config.get(option.key());
        }
        return (String)option.defaultValue();
    }

    private static Integer getIntegerOrDefault(Map<String, String> config, ConfigOption<Integer> option) {
        if (config.containsKey(option.key())) {
            return Integer.parseInt(config.get(option.key()));
        }
        return (Integer)option.defaultValue();
    }

    private static String getOrDefault(Map<String, String> config, List<String> priorityKeys, String defaultValue) {
        int size = priorityKeys.size();
        for (int i = 0; i < size; ++i) {
            String key = priorityKeys.get(i);
            if (!config.containsKey(key)) continue;
            return config.get(key);
        }
        return defaultValue;
    }

    private static Boolean getBooleanOrDefault(Map<String, String> config, ConfigOption<Boolean> option) {
        if (config.containsKey(option.key())) {
            return Boolean.parseBoolean(config.get(option.key()));
        }
        return (Boolean)option.defaultValue();
    }

    private static Duration getDurationOrDefault(Map<String, String> config, ConfigOption<Duration> option) {
        if (config.containsKey(option.key())) {
            return TimeUtils.parseDuration((String)config.get(option.key()));
        }
        return (Duration)option.defaultValue();
    }

    private String validateAndGetServerId(Map<String, String> config) {
        String serverIdValue = config.get(MySqlSourceOptions.SERVER_ID.key());
        if (serverIdValue != null) {
            try {
                ServerIdRange.from((String)serverIdValue);
            }
            catch (Exception e) {
                throw new ValidationException(String.format("The value of option 'server-id' is invalid: '%s'", serverIdValue), (Throwable)e);
            }
        }
        return serverIdValue;
    }

    private void setbooleanOptionIfPossible(Object obj, String name, boolean value) {
        try {
            obj.getClass().getDeclaredMethod(name, Boolean.TYPE).invoke(obj, value);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private static StartupOptions getStartupOptions(Map<String, String> config) {
        String modeString = MySqlCdcSourceFactory.getOrDefault(config, (ConfigOption<String>)MySqlSourceOptions.SCAN_STARTUP_MODE);
        switch (modeString.toLowerCase()) {
            case "initial": {
                return StartupOptions.initial();
            }
            case "latest-offset": {
                return StartupOptions.latest();
            }
            case "earliest-offset": 
            case "specific-offset": 
            case "timestamp": {
                throw new ValidationException(String.format("Unsupported option value '%s', the options [%s, %s, %s] are not supported correctly, please do not use them until they're correctly supported", modeString, SCAN_STARTUP_MODE_VALUE_EARLIEST, SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET, SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
            }
        }
        throw new ValidationException(String.format("Invalid value for option '%s'. Supported values are [%s, %s], but was: %s", MySqlSourceOptions.SCAN_STARTUP_MODE.key(), SCAN_STARTUP_MODE_VALUE_INITIAL, SCAN_STARTUP_MODE_VALUE_LATEST, modeString));
    }

    private void validateStartupOption(StartupOptions startupOptions) {
        Preconditions.checkState((startupOptions.startupMode == StartupMode.INITIAL || startupOptions.startupMode == StartupMode.LATEST_OFFSET ? 1 : 0) != 0, (Object)String.format("MySql Parallel Source only supports startup mode 'initial' and 'latest-offset', but actual is %s", startupOptions.startupMode));
    }

    private void validateIntegerOption(ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
        Preconditions.checkState((optionValue > exclusiveMin ? 1 : 0) != 0, (Object)String.format("The value of option '%s' must larger than %d, but is %d", option.key(), exclusiveMin, optionValue));
    }

    private Map<String, Map<Integer, MetadataConverter>> toMetadataConverters(Map<String, Map<Integer, String>> metadatas) {
        if (MapUtils.isEmpty(metadatas)) {
            return Collections.emptyMap();
        }
        HashMap metadataConverterses = MapUtils.newHashMap();
        for (Map.Entry<String, Map<Integer, String>> entry : metadatas.entrySet()) {
            HashMap metadataConverters = MapUtils.newHashMap();
            for (Map.Entry<Integer, String> e : entry.getValue().entrySet()) {
                MetadataConverter metadataConverter = METADATA_CONVERTERS.get(e.getValue());
                if (metadataConverter == null) {
                    throw new UnsupportedOperationException("Invalid metadata: " + e.getValue() + " for the connector: " + IDENTIFIER);
                }
                metadataConverters.put(e.getKey(), metadataConverter);
            }
            metadataConverterses.put(entry.getKey(), metadataConverters);
        }
        return metadataConverterses;
    }
}

