/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.table;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;

public class KafkaOptions {
    public static final ConfigOption<String> TOPIC = ConfigOptions.key((String)"topic").stringType().noDefaultValue().withDescription("Required topic name from which the table is read");
    public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions.key((String)"properties.bootstrap.servers").stringType().noDefaultValue().withDescription("Required Kafka server connection string");
    public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions.key((String)"properties.group.id").stringType().noDefaultValue().withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
    public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions.key((String)"scan.startup.mode").stringType().defaultValue((Object)"group-offsets").withDescription("Optional startup mode for Kafka consumer, valid enumerations are \"earliest-offset\", \"latest-offset\", \"group-offsets\", \"timestamp\"\nor \"specific-offsets\"");
    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions.key((String)"scan.startup.specific-offsets").stringType().noDefaultValue().withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions.key((String)"scan.startup.timestamp-millis").longType().noDefaultValue().withDescription("Optional timestamp used in case of \"timestamp\" startup mode");
    public static final ConfigOption<String> SINK_PARTITIONER = ConfigOptions.key((String)"sink.partitioner").stringType().noDefaultValue().withDescription("Optional output partitioning from Flink's partitions\ninto Kafka's partitions valid enumerations are\n\"fixed\": (each Flink partition ends up in at most one Kafka partition),\n\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin)\n\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)");
    public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
    public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
    public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
    public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
    public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
    private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet<String>(Arrays.asList("earliest-offset", "latest-offset", "group-offsets", "specific-offsets", "timestamp"));
    public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
    public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
    private static final Set<String> SINK_PARTITIONER_ENUMS = new HashSet<String>(Arrays.asList("fixed", "round-robin"));
    public static final String PROPERTIES_PREFIX = "properties.";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offset";

    private KafkaOptions() {
    }

    public static void validateTableOptions(ReadableConfig tableOptions) {
        KafkaOptions.validateScanStartupMode(tableOptions);
        KafkaOptions.validateSinkPartitioner(tableOptions);
    }

    private static void validateScanStartupMode(ReadableConfig tableOptions) {
        tableOptions.getOptional(SCAN_STARTUP_MODE).map(String::toLowerCase).ifPresent(mode -> {
            if (!SCAN_STARTUP_MODE_ENUMS.contains(mode)) {
                throw new ValidationException(String.format("Invalid value for option '%s'. Supported values are %s, but was: %s", SCAN_STARTUP_MODE.key(), "[earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp]", mode));
            }
            if (mode.equals(SCAN_STARTUP_MODE_VALUE_TIMESTAMP) && !tableOptions.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent()) {
                throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_TIMESTAMP_MILLIS.key(), SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
            }
            if (mode.equals(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)) {
                if (!tableOptions.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
                    throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_SPECIFIC_OFFSETS.key(), SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
                }
                String specificOffsets = (String)tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
                KafkaOptions.parseSpecificOffsets(specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
            }
        });
    }

    private static void validateSinkPartitioner(ReadableConfig tableOptions) {
        tableOptions.getOptional(SINK_PARTITIONER).ifPresent(partitioner -> {
            if (!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase()) && partitioner.isEmpty()) {
                throw new ValidationException(String.format("Option '%s' should be a non-empty string.", SINK_PARTITIONER.key()));
            }
        });
    }

    public static StartupOptions getStartupOptions(ReadableConfig tableOptions, String topic) {
        HashMap<KafkaTopicPartition, Long> specificOffsets = new HashMap<KafkaTopicPartition, Long>();
        StartupMode startupMode = tableOptions.getOptional(SCAN_STARTUP_MODE).map(modeString -> {
            switch (modeString) {
                case "earliest-offset": {
                    return StartupMode.EARLIEST;
                }
                case "latest-offset": {
                    return StartupMode.LATEST;
                }
                case "group-offsets": {
                    return StartupMode.GROUP_OFFSETS;
                }
                case "specific-offsets": {
                    KafkaOptions.buildSpecificOffsets(tableOptions, topic, specificOffsets);
                    return StartupMode.SPECIFIC_OFFSETS;
                }
                case "timestamp": {
                    return StartupMode.TIMESTAMP;
                }
            }
            throw new TableException("Unsupported startup mode. Validator should have checked that.");
        }).orElse(StartupMode.GROUP_OFFSETS);
        StartupOptions options = new StartupOptions();
        options.startupMode = startupMode;
        options.specificOffsets = specificOffsets;
        if (startupMode == StartupMode.TIMESTAMP) {
            options.startupTimestampMillis = (Long)tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
        }
        return options;
    }

    private static void buildSpecificOffsets(ReadableConfig tableOptions, String topic, Map<KafkaTopicPartition, Long> specificOffsets) {
        String specificOffsetsStrOpt = (String)tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
        Map<Integer, Long> offsetMap = KafkaOptions.parseSpecificOffsets(specificOffsetsStrOpt, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
        offsetMap.forEach((partition, offset) -> {
            KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, (int)partition);
            specificOffsets.put(topicPartition, (Long)offset);
        });
    }

    public static Properties getKafkaProperties(Map<String, String> tableOptions) {
        Properties kafkaProperties = new Properties();
        if (KafkaOptions.hasKafkaClientProperties(tableOptions)) {
            tableOptions.keySet().stream().filter(key -> key.startsWith(PROPERTIES_PREFIX)).forEach(key -> {
                String value2 = (String)tableOptions.get(key);
                String subKey = key.substring(PROPERTIES_PREFIX.length());
                kafkaProperties.put(subKey, value2);
            });
        }
        return kafkaProperties;
    }

    public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(ReadableConfig tableOptions, ClassLoader classLoader) {
        return tableOptions.getOptional(SINK_PARTITIONER).flatMap(partitioner -> {
            switch (partitioner) {
                case "fixed": {
                    return Optional.of(new FlinkFixedPartitioner());
                }
                case "round-robin": {
                    return Optional.empty();
                }
            }
            return Optional.of(KafkaOptions.initializePartitioner(partitioner, classLoader));
        });
    }

    public static Map<Integer, Long> parseSpecificOffsets(String specificOffsetsStr, String optionKey) {
        HashMap<Integer, Long> offsetMap = new HashMap<Integer, Long>();
        String[] pairs = specificOffsetsStr.split(";");
        String validationExceptionMessage = String.format("Invalid properties '%s' should follow the format 'partition:0,offset:42;partition:1,offset:300', but is '%s'.", optionKey, specificOffsetsStr);
        if (pairs.length == 0) {
            throw new ValidationException(validationExceptionMessage);
        }
        for (String pair : pairs) {
            if (null == pair || pair.length() == 0 || !pair.contains(",")) {
                throw new ValidationException(validationExceptionMessage);
            }
            String[] kv = pair.split(",");
            if (kv.length != 2 || !kv[0].startsWith("partition:") || !kv[1].startsWith("offset:")) {
                throw new ValidationException(validationExceptionMessage);
            }
            String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
            String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
            try {
                Integer partition = Integer.valueOf(partitionValue);
                Long offset = Long.valueOf(offsetValue);
                offsetMap.put(partition, offset);
            }
            catch (NumberFormatException e) {
                throw new ValidationException(validationExceptionMessage, (Throwable)e);
            }
        }
        return offsetMap;
    }

    private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) {
        return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
    }

    private static <T> FlinkKafkaPartitioner<T> initializePartitioner(String name, ClassLoader classLoader) {
        try {
            Class<?> clazz = Class.forName(name, true, classLoader);
            if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
                throw new ValidationException(String.format("Sink partitioner class '%s' should extend from the required class %s", name, FlinkKafkaPartitioner.class.getName()));
            }
            FlinkKafkaPartitioner kafkaPartitioner = (FlinkKafkaPartitioner)InstantiationUtil.instantiate((String)name, FlinkKafkaPartitioner.class, (ClassLoader)classLoader);
            return kafkaPartitioner;
        }
        catch (ClassNotFoundException | FlinkException e) {
            throw new ValidationException(String.format("Could not find and instantiate partitioner class '%s'", name), e);
        }
    }

    public static class StartupOptions {
        public StartupMode startupMode;
        public Map<KafkaTopicPartition, Long> specificOffsets;
        public long startupTimestampMillis;
    }
}

