/*
 * Decompiled with CFR 0.152.
 */
package org.locationtech.geowave.test.kafka;

import java.io.File;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.locationtech.geowave.core.cli.api.OperationParams;
import org.locationtech.geowave.core.cli.parser.ManualOperationParams;
import org.locationtech.geowave.core.ingest.operations.KafkaToGeoWaveCommand;
import org.locationtech.geowave.core.ingest.operations.LocalToKafkaCommand;
import org.locationtech.geowave.core.ingest.operations.options.IngestFormatPluginOptions;
import org.locationtech.geowave.core.store.api.DataStore;
import org.locationtech.geowave.core.store.cli.store.AddStoreCommand;
import org.locationtech.geowave.core.store.cli.store.DataStorePluginOptions;
import org.locationtech.geowave.core.store.index.IndexPluginOptions;
import org.locationtech.geowave.core.store.index.IndexStore;
import org.locationtech.geowave.test.TestUtils;
import org.locationtech.geowave.test.kafka.KafkaTestEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTestUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestEnvironment.class);
    private static final String MAX_MESSAGE_BYTES = "5000000";
    protected static final File DEFAULT_LOG_DIR = new File(TestUtils.TEMP_DIR, "kafka-logs");

    public static void testKafkaStage(String ingestFilePath) throws Exception {
        IngestFormatPluginOptions ingestFormatOptions = new IngestFormatPluginOptions();
        ingestFormatOptions.selectPlugin("gpx");
        LocalToKafkaCommand localToKafka = new LocalToKafkaCommand();
        localToKafka.setParameters(ingestFilePath);
        localToKafka.setPluginFormats(ingestFormatOptions);
        localToKafka.getKafkaOptions().setBootstrapServers(KafkaTestEnvironment.getInstance().getBootstrapServers());
        localToKafka.getKafkaOptions().setRetryBackoffMs("1000");
        localToKafka.execute((OperationParams)new ManualOperationParams());
    }

    public static void testKafkaIngest(DataStorePluginOptions options, boolean spatialTemporal, String ingestFilePath) throws Exception {
        LOGGER.warn("Ingesting '" + ingestFilePath + "' - this may take several minutes...");
        IngestFormatPluginOptions ingestFormatOptions = new IngestFormatPluginOptions();
        ingestFormatOptions.selectPlugin("gpx");
        IndexPluginOptions indexOption = new IndexPluginOptions();
        indexOption.selectPlugin(spatialTemporal ? "spatial_temporal" : "spatial");
        KafkaToGeoWaveCommand kafkaToGeowave = new KafkaToGeoWaveCommand();
        File configFile = File.createTempFile("test_stats", null);
        ManualOperationParams params = new ManualOperationParams();
        params.getContext().put("properties-file", configFile);
        AddStoreCommand addStore = new AddStoreCommand();
        addStore.setParameters("test-store");
        addStore.setPluginOptions(options);
        addStore.execute((OperationParams)params);
        IndexStore indexStore = options.createIndexStore();
        DataStore dataStore = options.createDataStore();
        if (indexStore.getIndex("testIndex") == null) {
            indexOption.setName("testIndex");
            dataStore.addIndex(indexOption.createIndex(dataStore));
        }
        kafkaToGeowave.setPluginFormats(ingestFormatOptions);
        kafkaToGeowave.getKafkaOptions().setBootstrapServers(KafkaTestEnvironment.getInstance().getBootstrapServers());
        kafkaToGeowave.getKafkaOptions().setConsumerTimeoutMs("5000");
        kafkaToGeowave.getKafkaOptions().setReconnectOnTimeout(false);
        kafkaToGeowave.getKafkaOptions().setGroupId("testGroup");
        kafkaToGeowave.getKafkaOptions().setAutoOffsetReset("earliest");
        kafkaToGeowave.getKafkaOptions().setMaxPartitionFetchBytes(MAX_MESSAGE_BYTES);
        kafkaToGeowave.setParameters("test-store", "testIndex");
        kafkaToGeowave.execute((OperationParams)params);
        try {
            kafkaToGeowave.getDriver().waitFutures();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public static Properties getKafkaBrokerConfig(String host) {
        Properties props = new Properties();
        props.put("log.dirs", DEFAULT_LOG_DIR.getAbsolutePath());
        props.put("broker.id", "0");
        props.put("listeners", "PLAINTEXT://" + host + ":9092");
        props.put("message.max.bytes", MAX_MESSAGE_BYTES);
        props.put("replica.fetch.max.bytes", MAX_MESSAGE_BYTES);
        props.put("num.partitions", "1");
        return props;
    }
}

