/*
 * Decompiled with CFR 0.152.
 */
package org.locationtech.geomesa.kafka.tools.export;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import org.geotools.data.DataUtilities;
import org.geotools.data.FeatureEvent;
import org.geotools.data.FeatureListener;
import org.locationtech.geomesa.kafka.data.KafkaDataStore;
import org.locationtech.geomesa.kafka.tools.ConsumerDataStoreParams;
import org.locationtech.geomesa.kafka.tools.ConsumerDataStoreParams$class;
import org.locationtech.geomesa.kafka.tools.KafkaDataStoreCommand;
import org.locationtech.geomesa.kafka.tools.KafkaDataStoreCommand$class;
import org.locationtech.geomesa.kafka.tools.KafkaDataStoreParams$class;
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent;
import org.locationtech.geomesa.tools.Command$;
import org.locationtech.geomesa.tools.DataStoreCommand;
import org.locationtech.geomesa.tools.RequiredTypeNameParam;
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes$;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import scala.Function0;
import scala.Function1;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.reflect.ScalaSignature;
import scala.runtime.TraitSetter;

@ScalaSignature(bytes="\u0006\u0001\u0005=c\u0001B\u0001\u0003\u0001=\u0011!cS1gW\u0006d\u0015n\u001d;f]\u000e{W.\\1oI*\u00111\u0001B\u0001\u0007Kb\u0004xN\u001d;\u000b\u0005\u00151\u0011!\u0002;p_2\u001c(BA\u0004\t\u0003\u0015Y\u0017MZ6b\u0015\tI!\"A\u0004hK>lWm]1\u000b\u0005-a\u0011\u0001\u00047pG\u0006$\u0018n\u001c8uK\u000eD'\"A\u0007\u0002\u0007=\u0014xm\u0001\u0001\u0014\t\u0001\u0001bC\u0007\t\u0003#Qi\u0011A\u0005\u0006\u0002'\u0005)1oY1mC&\u0011QC\u0005\u0002\u0007\u0003:L(+\u001a4\u0011\u0005]AR\"\u0001\u0003\n\u0005e!!!F&bM.\fG)\u0019;b'R|'/Z\"p[6\fg\u000e\u001a\t\u00037\tj\u0011\u0001\b\u0006\u0003;y\tAb]2bY\u0006dwnZ4j]\u001eT!a\b\u0011\u0002\u0011QL\b/Z:bM\u0016T\u0011!I\u0001\u0004G>l\u0017BA\u0012\u001d\u0005-a\u0015M_=M_\u001e<\u0017N\\4\t\u000b\u0015\u0002A\u0011\u0001\u0014\u0002\rqJg.\u001b;?)\u00059\u0003C\u0001\u0015\u0001\u001b\u0005\u0011\u0001b\u0002\u0016\u0001\u0005\u0004%\teK\u0001\u0005]\u0006lW-F\u0001-!\ti#'D\u0001/\u0015\ty\u0003'\u0001\u0003mC:<'\"A\u0019\u0002\t)\fg/Y\u0005\u0003g9\u0012aa\u0015;sS:<\u0007BB\u001b\u0001A\u0003%A&A\u0003oC6,\u0007\u0005C\u00048\u0001\t\u0007I\u0011\t\u001d\u0002\rA\f'/Y7t+\u0005I\u0004C\u0001\u001eR\u001d\tY$J\u0004\u0002=\u0013:\u0011Q\b\u0013\b\u0003}\u001ds!a\u0010$\u000f\u0005\u0001+eBA!E\u001b\u0005\u0011%BA\"\u000f\u0003\u0019a$o\\8u}%\tQ\"\u0003\u0002\f\u0019%\u0011\u0011BC\u0005\u0003\u000f!I!!\u0002\u0004\n\u0005\r!q!B&\u0003\u0011\u0003a\u0015AE&bM.\fG*[:uK:\u001cu.\\7b]\u0012\u0004\"\u0001K'\u0007\u000b\u0005\u0011\u0001\u0012\u0001(\u0014\u00055\u0003\u0002\"B\u0013N\t\u0003\u0001F#\u0001'\u0007\tIk\u0005a\u0015\u0002\u0011\u0019&\u001cH/\u001a8QCJ\fW.\u001a;feN\u001cB!\u0015\tU/B\u0011q#V\u0005\u0003-\u0012\u0011qcQ8ogVlWM\u001d#bi\u0006\u001cFo\u001c:f!\u0006\u0014\u0018-\\:\u0011\u0005aSV\"A-\u000b\u0005\u0015A\u0011BA.Z\u0005U\u0011V-];je\u0016$G+\u001f9f\u001d\u0006lW\rU1sC6DQ!J)\u0005\u0002u#\u0012A\u0018\t\u0003?Fk\u0011!\u0014\u0015\u0005#\u0006L'\u000e\u0005\u0002cO6\t1M\u0003\u0002eK\u0006Q!nY8n[\u0006tG-\u001a:\u000b\u0005\u0019\u0004\u0013!\u00022fkN$\u0018B\u00015d\u0005)\u0001\u0016M]1nKR,'o]\u0001\u0013G>lW.\u00198e\t\u0016\u001c8M]5qi&|g.I\u0001l\u0003}a\u0015n\u001d;f]\u0002\"x\u000eI1!\u000f\u0016|W*Z:bA-\u000bgm[1!i>\u0004\u0018n\u0019\u0004\u0005[6\u0003aN\u0001\nPkR4U-\u0019;ve\u0016d\u0015n\u001d;f]\u0016\u00148c\u00017peB\u0011Q\u0006]\u0005\u0003c:\u0012aa\u00142kK\u000e$\bCA:y\u001b\u0005!(BA;w\u0003\u0011!\u0017\r^1\u000b\u0005]d\u0011\u0001C4f_R|w\u000e\\:\n\u0005e$(a\u0004$fCR,(/\u001a'jgR,g.\u001a:\t\u000b\u0015bG\u0011A>\u0015\u0003q\u0004\"a\u00187\t\u000fyd'\u0019!C\u0005\u007f\u0006Iam\u001c:nCR$XM]\u000b\u0003\u0003\u0003\u0001B!a\u0001\u0002\u000e5\u0011\u0011Q\u0001\u0006\u0005\u0003\u000f\tI!\u0001\u0004g_Jl\u0017\r\u001e\u0006\u0004\u0003\u0017\u0001\u0014\u0001\u0002;j[\u0016LA!a\u0004\u0002\u0006\t\tB)\u0019;f)&lWMR8s[\u0006$H/\u001a:\t\u0011\u0005MA\u000e)A\u0005\u0003\u0003\t!BZ8s[\u0006$H/\u001a:!\u0011\u001d\t9\u0001\u001cC\u0005\u0003/!B!!\u0007\u0002&A!\u00111DA\u0011\u001d\r\t\u0012QD\u0005\u0004\u0003?\u0011\u0012A\u0002)sK\u0012,g-C\u00024\u0003GQ1!a\b\u0013\u0011!\t9#!\u0006A\u0002\u0005%\u0012A\u0001;t!\r\t\u00121F\u0005\u0004\u0003[\u0011\"\u0001\u0002'p]\u001eDq!!\rm\t\u0003\n\u0019$A\u0004dQ\u0006tw-\u001a3\u0015\t\u0005U\u00121\b\t\u0004#\u0005]\u0012bAA\u001d%\t!QK\\5u\u0011!\ti$a\fA\u0002\u0005}\u0012!B3wK:$\bcA:\u0002B%\u0019\u00111\t;\u0003\u0019\u0019+\u0017\r^;sK\u00163XM\u001c;\t\u000f\u0005\u001d\u0003\u0001)A\u0005s\u00059\u0001/\u0019:b[N\u0004\u0003bBA&\u0001\u0011\u0005\u0013QJ\u0001\bKb,7-\u001e;f)\t\t)\u0004")
public class KafkaListenCommand
implements KafkaDataStoreCommand,
LazyLogging {
    private final String name;
    private final ListenParameters params;
    private final Logger logger;
    private volatile boolean bitmap$0;

    private Logger logger$lzycompute() {
        KafkaListenCommand kafkaListenCommand = this;
        synchronized (kafkaListenCommand) {
            if (!this.bitmap$0) {
                this.logger = LazyLogging.class.logger((LazyLogging)this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    @Override
    public Map<String, String> connection() {
        return KafkaDataStoreCommand$class.connection(this);
    }

    public <T> T withDataStore(Function1<KafkaDataStore, T> method) throws ParameterException {
        return (T)DataStoreCommand.class.withDataStore((DataStoreCommand)this, method);
    }

    public String name() {
        return this.name;
    }

    @Override
    public ListenParameters params() {
        return this.params;
    }

    public void execute() {
        this.withDataStore((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ KafkaListenCommand $outer;

            public final void apply(KafkaDataStore ds) {
                SimpleFeatureType sft = ds.getSchema(this.$outer.params().featureName());
                if (sft == null) {
                    throw new ParameterException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Type ", " does not exist at path ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.params().featureName(), this.$outer.params().zkPath()})));
                }
                Command$.MODULE$.user().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Listening to '", "' ", " ..."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{sft.getTypeName(), SimpleFeatureTypes$.MODULE$.encodeType(sft)})));
                ds.getFeatureSource(sft.getTypeName()).addFeatureListener((FeatureListener)new OutFeatureListener());
                try {
                    while (true) {
                        Thread.sleep(1000L);
                    }
                }
                catch (InterruptedException interruptedException) {
                    return;
                }
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    public KafkaListenCommand() {
        DataStoreCommand.class.$init$((DataStoreCommand)this);
        KafkaDataStoreCommand$class.$init$(this);
        LazyLogging.class.$init$((LazyLogging)this);
        this.name = "listen";
        this.params = new ListenParameters();
    }

    @Parameters(commandDescription="Listen to a GeoMesa Kafka topic")
    public static class ListenParameters
    implements ConsumerDataStoreParams,
    RequiredTypeNameParam {
        @Parameter(names={"-f", "--feature-name"}, description="Simple Feature Type name on which to operate", required=true)
        private String featureName;
        @Parameter(names={"--num-consumers"}, description="Number of consumer threads used for reading from Kafka")
        private int numConsumers;
        @Parameter(names={"--from-beginning"}, description="Consume from the beginning or end of the topic")
        private boolean fromBeginning;
        private final int replication;
        private final int partitions;
        @Parameter(names={"-b", "--brokers"}, description="Brokers (host:port, comma separated)", required=true)
        private String brokers;
        @Parameter(names={"-z", "--zookeepers"}, description="Zookeepers (host[:port], comma separated)", required=true)
        private String zookeepers;
        @Parameter(names={"-p", "--zkpath"}, description="Zookeeper path where feature schemas are saved")
        private String zkPath;

        public String featureName() {
            return this.featureName;
        }

        public void featureName_$eq(String x$1) {
            this.featureName = x$1;
        }

        @Override
        public int numConsumers() {
            return this.numConsumers;
        }

        @Override
        @TraitSetter
        public void numConsumers_$eq(int x$1) {
            this.numConsumers = x$1;
        }

        @Override
        public boolean fromBeginning() {
            return this.fromBeginning;
        }

        @Override
        @TraitSetter
        public void fromBeginning_$eq(boolean x$1) {
            this.fromBeginning = x$1;
        }

        @Override
        public int replication() {
            return this.replication;
        }

        @Override
        public int partitions() {
            return this.partitions;
        }

        @Override
        public void org$locationtech$geomesa$kafka$tools$ConsumerDataStoreParams$_setter_$replication_$eq(int x$1) {
            this.replication = x$1;
        }

        @Override
        public void org$locationtech$geomesa$kafka$tools$ConsumerDataStoreParams$_setter_$partitions_$eq(int x$1) {
            this.partitions = x$1;
        }

        @Override
        public String brokers() {
            return this.brokers;
        }

        @Override
        @TraitSetter
        public void brokers_$eq(String x$1) {
            this.brokers = x$1;
        }

        @Override
        public String zookeepers() {
            return this.zookeepers;
        }

        @Override
        @TraitSetter
        public void zookeepers_$eq(String x$1) {
            this.zookeepers = x$1;
        }

        @Override
        public String zkPath() {
            return this.zkPath;
        }

        @Override
        @TraitSetter
        public void zkPath_$eq(String x$1) {
            this.zkPath = x$1;
        }

        public ListenParameters() {
            KafkaDataStoreParams$class.$init$(this);
            ConsumerDataStoreParams$class.$init$(this);
            RequiredTypeNameParam.class.$init$((RequiredTypeNameParam)this);
        }
    }

    public static class OutFeatureListener
    implements FeatureListener {
        private final DateTimeFormatter formatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneOffset.UTC);

        private DateTimeFormatter formatter() {
            return this.formatter;
        }

        private String format(long ts) {
            return ZonedDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC).format(this.formatter());
        }

        public void changed(FeatureEvent event) {
            String string;
            FeatureEvent featureEvent = event;
            if (featureEvent instanceof KafkaFeatureEvent.KafkaFeatureChanged) {
                KafkaFeatureEvent.KafkaFeatureChanged kafkaFeatureChanged = (KafkaFeatureEvent.KafkaFeatureChanged)featureEvent;
                string = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", " [add/update] ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.format(kafkaFeatureChanged.time()), DataUtilities.encodeFeature((SimpleFeature)kafkaFeatureChanged.feature())}));
            } else if (featureEvent instanceof KafkaFeatureEvent.KafkaFeatureRemoved) {
                KafkaFeatureEvent.KafkaFeatureRemoved kafkaFeatureRemoved = (KafkaFeatureEvent.KafkaFeatureRemoved)featureEvent;
                string = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", " [delete]     ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.format(kafkaFeatureRemoved.time()), Option$.MODULE$.apply((Object)kafkaFeatureRemoved.feature()).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply(SimpleFeature x$1) {
                        return DataUtilities.encodeFeature((SimpleFeature)x$1);
                    }
                }).getOrElse((Function0)new Serializable(this, kafkaFeatureRemoved){
                    public static final long serialVersionUID = 0L;
                    private final KafkaFeatureEvent.KafkaFeatureRemoved x3$1;

                    public final String apply() {
                        return this.x3$1.id();
                    }
                    {
                        this.x3$1 = x3$1;
                    }
                })}));
            } else if (featureEvent instanceof KafkaFeatureEvent.KafkaFeatureCleared) {
                KafkaFeatureEvent.KafkaFeatureCleared kafkaFeatureCleared = (KafkaFeatureEvent.KafkaFeatureCleared)featureEvent;
                string = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", " [clear]"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.format(kafkaFeatureCleared.time())}));
            } else {
                string = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unknown event ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{featureEvent}));
            }
            String msg = string;
            Command$.MODULE$.output().info(msg);
        }
    }
}

