/*
 * Decompiled with CFR 0.152.
 */
package org.locationtech.geomesa.kafka.jstreams;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.geotools.data.DataStoreFinder;
import org.geotools.data.FeatureWriter;
import org.geotools.data.Query;
import org.geotools.data.Transaction;
import org.geotools.data.simple.SimpleFeatureWriter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.locationtech.geomesa.features.ScalaSimpleFeature;
import org.locationtech.geomesa.index.geotools.GeoMesaFeatureReader;
import org.locationtech.geomesa.kafka.EmbeddedKafka;
import org.locationtech.geomesa.kafka.data.KafkaDataStore;
import org.locationtech.geomesa.kafka.jstreams.GeoMesaStreamsBuilder;
import org.locationtech.geomesa.kafka.streams.GeoMesaMessage;
import org.locationtech.geomesa.utils.geotools.FeatureUtils;
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes;
import org.locationtech.geomesa.utils.geotools.converters.FastConverter;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GeoMesaStreamsBuilderTest {
    private static final Logger logger = LoggerFactory.getLogger(GeoMesaStreamsBuilderTest.class);
    static EmbeddedKafka kafka = null;
    static final SimpleFeatureType sft = SimpleFeatureTypes.createImmutableType((String)"streams", (String)"name:String,age:Int,dtg:Date,*geom:Point:srid=4326");
    static final List<SimpleFeature> features = new ArrayList<SimpleFeature>();
    static final Set<String> zkPaths = Collections.newSetFromMap(new ConcurrentHashMap());

    public Map<String, String> getParams(String zkPath) {
        if (!zkPaths.add(zkPath)) {
            throw new IllegalArgumentException("zk path '" + zkPath + "' is reused between tests, may cause conflicts");
        }
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("kafka.brokers", kafka.brokers());
        params.put("kafka.zookeepers", kafka.zookeepers());
        params.put("kafka.topic.partitions", "1");
        params.put("kafka.topic.replication", "1");
        params.put("kafka.consumer.read-back", "Inf");
        params.put("kafka.zk.path", zkPath);
        return params;
    }

    @BeforeClass
    public static void init() {
        logger.info("Starting embedded kafka/zk");
        kafka = new EmbeddedKafka();
        logger.info("Started embedded kafka/zk");
        for (int i = 0; i < 10; ++i) {
            ScalaSimpleFeature sf = new ScalaSimpleFeature(sft, "id" + i, null, null);
            sf.setAttribute(0, (Object)("name" + i));
            sf.setAttribute(1, (Object)(i % 2));
            sf.setAttribute(2, (Object)("2022-04-27T00:00:0" + i + ".00Z"));
            sf.setAttribute(3, (Object)("POINT(1 " + i + ")"));
            features.add((SimpleFeature)sf);
        }
    }

    @AfterClass
    public static void destroy() {
        logger.info("Stopping embedded kafka/zk");
        if (kafka != null) {
            kafka.close();
        }
        logger.info("Stopped embedded kafka/zk");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRead() throws Exception {
        String kryoTopic;
        Map<String, String> params = this.getParams("word/count");
        KafkaDataStore ds = (KafkaDataStore)DataStoreFinder.getDataStore(params);
        try {
            ds.createSchema(sft);
            try (SimpleFeatureWriter writer = ds.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT);){
                features.forEach(f -> FeatureUtils.write((FeatureWriter)writer, (SimpleFeature)f, (boolean)true));
            }
            kryoTopic = KafkaDataStore.topic((SimpleFeatureType)ds.getSchema(sft.getTypeName()));
        }
        finally {
            ds.dispose();
        }
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", kafka.brokers());
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.put("group.id", "consume-kryo-topic");
        ArrayList messages = new ArrayList();
        try (KafkaConsumer consumer = new KafkaConsumer(consumerProps);){
            consumer.subscribe(Collections.singleton(kryoTopic));
            long start = System.currentTimeMillis();
            while (messages.size() < 10 && System.currentTimeMillis() - start < 10000L) {
                consumer.poll(Duration.ofMillis(100L)).forEach(messages::add);
            }
        }
        TimestampExtractingTransformer timestampExtractor = new TimestampExtractingTransformer();
        GeoMesaStreamsBuilder builder = GeoMesaStreamsBuilder.create(params);
        KStream streamFeatures = builder.stream(sft.getTypeName()).transform(() -> timestampExtractor, new String[0]);
        KStream textLines = streamFeatures.mapValues(v -> v.asJava().stream().map(a -> a.toString().replaceAll(" ", "_")).collect(Collectors.joining(" ")));
        KTable wordCounts = textLines.flatMapValues(textLine -> Arrays.asList(textLine.split(" +"))).groupBy((k, word) -> word).count(Materialized.as((String)"counts-store"));
        wordCounts.toStream().to("word-count", Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        Properties streamsProps = new Properties();
        streamsProps.put("application.id", "java-word-count-test");
        streamsProps.put("bootstrap.servers", "dummy:1234");
        streamsProps.put("default.key.serde", Serdes.String().getClass());
        streamsProps.put("default.value.serde", Serdes.String().getClass());
        HashMap<Object, Object> output = new HashMap<Object, Object>();
        try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsProps);){
            messages.forEach(arg_0 -> ((TopologyTestDriver)testDriver).pipeInput(arg_0));
            ProducerRecord out = testDriver.readOutput("word-count", (Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer());
            while (out != null) {
                output.put(out.key(), out.value());
                out = testDriver.readOutput("word-count", (Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer());
            }
        }
        HashMap expected = new HashMap();
        features.stream().flatMap(f -> f.getAttributes().stream().map(a -> a.toString().replaceAll(" ", "_"))).forEach(w -> expected.compute(w, (word, count) -> count == null ? 1L : count + 1L));
        Assert.assertEquals(expected, output);
        List expectedTimestamps = features.stream().map(f -> ((Date)f.getAttribute("dtg")).getTime()).collect(Collectors.toList());
        timestampExtractor.timestamps.values().forEach(c -> Assert.assertEquals((long)1L, (long)c.size()));
        List timestamps = timestampExtractor.timestamps.values().stream().map(c -> (Long)c.get(0)).sorted().collect(Collectors.toList());
        Assert.assertEquals(expectedTimestamps, timestamps);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWrite() throws Exception {
        String kryoTopic;
        Map<String, String> params = this.getParams("write/test");
        KafkaDataStore ds = (KafkaDataStore)DataStoreFinder.getDataStore(params);
        try {
            ds.createSchema(sft);
            kryoTopic = KafkaDataStore.topic((SimpleFeatureType)ds.getSchema(sft.getTypeName()));
        }
        finally {
            ds.dispose();
        }
        List<ConsumerRecord> testInput = features.stream().map(sf -> {
            long offset = Long.parseLong(sf.getID().replace("id", ""));
            byte[] key = sf.getID().getBytes(StandardCharsets.UTF_8);
            String value = sf.getAttributes().stream().map(a -> (String)FastConverter.convert((Object)a, String.class)).collect(Collectors.joining(","));
            return new ConsumerRecord("input-topic", 0, offset, (Object)key, (Object)value.getBytes(StandardCharsets.UTF_8));
        }).collect(Collectors.toList());
        GeoMesaStreamsBuilder builder = GeoMesaStreamsBuilder.create(params);
        KStream input = builder.wrapped().stream("input-topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()).withTimestampExtractor((TimestampExtractor)new WallclockTimestampExtractor()));
        KStream output = input.mapValues(lines -> GeoMesaMessage.upsert(Arrays.asList(lines.split(","))));
        builder.to(sft.getTypeName(), output);
        Properties streamsProps = new Properties();
        streamsProps.put("application.id", "java-write-test");
        streamsProps.put("bootstrap.servers", "dummy:1234");
        streamsProps.put("default.key.serde", Serdes.String().getClass());
        streamsProps.put("default.value.serde", Serdes.String().getClass());
        ArrayList<ProducerRecord> kryoMessages = new ArrayList<ProducerRecord>();
        try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsProps);){
            testInput.forEach(arg_0 -> ((TopologyTestDriver)testDriver).pipeInput(arg_0));
            ProducerRecord out = testDriver.readOutput(kryoTopic, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
            while (out != null) {
                kryoMessages.add(out);
                out = testDriver.readOutput(kryoTopic, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
            }
        }
        ArrayList<Object> result = new ArrayList<Object>();
        ds = (KafkaDataStore)DataStoreFinder.getDataStore(params);
        try {
            ds.getFeatureReader(new Query(sft.getTypeName()), Transaction.AUTO_COMMIT).close();
            try (Producer producer = KafkaDataStore.producer((KafkaDataStore.KafkaDataStoreConfig)ds.config());){
                kryoMessages.forEach(arg_0 -> ((Producer)producer).send(arg_0));
            }
            long end = System.currentTimeMillis() + 4000L;
            while (System.currentTimeMillis() < end) {
                try (GeoMesaFeatureReader reader = ds.getFeatureReader(new Query(sft.getTypeName()), Transaction.AUTO_COMMIT);){
                    while (reader.hasNext()) {
                        result.add(reader.next());
                    }
                }
                result.sort(Comparator.comparing(SimpleFeature::getID));
                if (result.equals(features)) {
                    break;
                }
                result.clear();
                Thread.sleep(100L);
            }
        }
        finally {
            ds.dispose();
        }
        Assert.assertEquals(features, result);
    }

    static class TimestampExtractingTransformer
    implements Transformer<String, GeoMesaMessage, KeyValue<String, GeoMesaMessage>> {
        private ProcessorContext context = null;
        Map<String, List<Long>> timestamps = new HashMap<String, List<Long>>();

        TimestampExtractingTransformer() {
        }

        public void init(ProcessorContext context) {
            this.context = context;
        }

        public KeyValue<String, GeoMesaMessage> transform(String key, GeoMesaMessage value) {
            this.timestamps.computeIfAbsent(key, k -> new ArrayList()).add(this.context.timestamp());
            return new KeyValue((Object)key, (Object)value);
        }

        public void close() {
        }
    }
}

