/*
 * Decompiled with CFR 0.152.
 */
package org.locationtech.geomesa.kafka.tools.export;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import org.geotools.data.DataStore;
import org.geotools.data.DataUtilities;
import org.geotools.data.FeatureEvent;
import org.geotools.data.FeatureListener;
import org.locationtech.geomesa.kafka.data.KafkaDataStore;
import org.locationtech.geomesa.kafka.tools.ConsumerDataStoreParams;
import org.locationtech.geomesa.kafka.tools.ConsumerDataStoreParams$class;
import org.locationtech.geomesa.kafka.tools.KafkaDataStoreCommand;
import org.locationtech.geomesa.kafka.tools.KafkaDataStoreCommand$class;
import org.locationtech.geomesa.kafka.tools.KafkaDataStoreParams$class;
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent;
import org.locationtech.geomesa.tools.RequiredTypeNameParam;
import org.locationtech.geomesa.tools.package;
import org.locationtech.geomesa.tools.utils.ParameterConverters;
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes$;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.concurrent.duration.Duration;
import scala.reflect.ScalaSignature;
import scala.runtime.TraitSetter;

@ScalaSignature(bytes="\u0006\u0001\u0005=c\u0001B\u0001\u0003\u0001=\u0011!cS1gW\u0006d\u0015n\u001d;f]\u000e{W.\\1oI*\u00111\u0001B\u0001\u0007Kb\u0004xN\u001d;\u000b\u0005\u00151\u0011!\u0002;p_2\u001c(BA\u0004\t\u0003\u0015Y\u0017MZ6b\u0015\tI!\"A\u0004hK>lWm]1\u000b\u0005-a\u0011\u0001\u00047pG\u0006$\u0018n\u001c8uK\u000eD'\"A\u0007\u0002\u0007=\u0014xm\u0001\u0001\u0014\t\u0001\u0001\u0002\u0004\b\t\u0003#Yi\u0011A\u0005\u0006\u0003'Q\tA\u0001\\1oO*\tQ#\u0001\u0003kCZ\f\u0017BA\f\u0013\u0005\u0019y%M[3diB\u0011\u0011DG\u0007\u0002\t%\u00111\u0004\u0002\u0002\u0016\u0017\u000647.\u0019#bi\u0006\u001cFo\u001c:f\u0007>lW.\u00198e!\tiB%D\u0001\u001f\u0015\ty\u0002%\u0001\u0007tG\u0006d\u0017\r\\8hO&twM\u0003\u0002\"E\u0005AA/\u001f9fg\u00064WMC\u0001$\u0003\r\u0019w.\\\u0005\u0003Ky\u00111\u0002T1{s2{wmZ5oO\")q\u0005\u0001C\u0001Q\u00051A(\u001b8jiz\"\u0012!\u000b\t\u0003U\u0001i\u0011A\u0001\u0005\bY\u0001\u0011\r\u0011\"\u0011.\u0003\u0011q\u0017-\\3\u0016\u00039\u0002\"!E\u0018\n\u0005A\u0012\"AB*ue&tw\r\u0003\u00043\u0001\u0001\u0006IAL\u0001\u0006]\u0006lW\r\t\u0005\bi\u0001\u0011\r\u0011\"\u00116\u0003\u0019\u0001\u0018M]1ngV\ta\u0007\u0005\u00028):\u0011\u0001h\u0012\b\u0003s\u0019s!AO#\u000f\u0005m\"eB\u0001\u001fD\u001d\ti$I\u0004\u0002?\u00036\tqH\u0003\u0002A\u001d\u00051AH]8pizJ\u0011!D\u0005\u0003\u00171I!!\u0003\u0006\n\u0005\u001dA\u0011BA\u0003\u0007\u0013\t\u0019AaB\u0003I\u0005!\u0005\u0011*\u0001\nLC\u001a\\\u0017\rT5ti\u0016t7i\\7nC:$\u0007C\u0001\u0016K\r\u0015\t!\u0001#\u0001L'\tQE\n\u0005\u0002N!6\taJC\u0001P\u0003\u0015\u00198-\u00197b\u0013\t\tfJ\u0001\u0004B]f\u0014VM\u001a\u0005\u0006O)#\ta\u0015\u000b\u0002\u0013\u001a!QK\u0013\u0001W\u0005Aa\u0015n\u001d;f]B\u000b'/Y7fi\u0016\u00148o\u0005\u0003U\u0019^S\u0006CA\rY\u0013\tIFAA\fD_:\u001cX/\\3s\t\u0006$\u0018m\u0015;pe\u0016\u0004\u0016M]1ngB\u00111,X\u0007\u00029*\u0011Q\u0001C\u0005\u0003=r\u0013QCU3rk&\u0014X\r\u001a+za\u0016t\u0015-\\3QCJ\fW\u000eC\u0003()\u0012\u0005\u0001\rF\u0001b!\t\u0011G+D\u0001KQ\u0011!F\r\\7\u0011\u0005\u0015TW\"\u00014\u000b\u0005\u001dD\u0017A\u00036d_6l\u0017M\u001c3fe*\u0011\u0011NI\u0001\u0006E\u0016,8\u000f^\u0005\u0003W\u001a\u0014!\u0002U1sC6,G/\u001a:t\u0003I\u0019w.\\7b]\u0012$Um]2sSB$\u0018n\u001c8\"\u00039\fq\u0004T5ti\u0016t\u0007\u0005^8!C\u0002:Um\\'fg\u0006\u00043*\u00194lC\u0002\"x\u000e]5d\r\u0011\u0001(\nA9\u0003%=+HOR3biV\u0014X\rT5ti\u0016tWM]\n\u0004_B\u0011\bCA:y\u001b\u0005!(BA;w\u0003\u0011!\u0017\r^1\u000b\u0005]d\u0011\u0001C4f_R|w\u000e\\:\n\u0005e$(a\u0004$fCR,(/\u001a'jgR,g.\u001a:\t\u000b\u001dzG\u0011A>\u0015\u0003q\u0004\"AY8\t\u000fy|'\u0019!C\u0005\u007f\u0006Iam\u001c:nCR$XM]\u000b\u0003\u0003\u0003\u0001B!a\u0001\u0002\u000e5\u0011\u0011Q\u0001\u0006\u0005\u0003\u000f\tI!\u0001\u0004g_Jl\u0017\r\u001e\u0006\u0004\u0003\u0017!\u0012\u0001\u0002;j[\u0016LA!a\u0004\u0002\u0006\t\tB)\u0019;f)&lWMR8s[\u0006$H/\u001a:\t\u0011\u0005Mq\u000e)A\u0005\u0003\u0003\t!BZ8s[\u0006$H/\u001a:!\u0011\u001d\t9a\u001cC\u0005\u0003/!B!!\u0007\u0002&A!\u00111DA\u0011\u001d\ri\u0015QD\u0005\u0004\u0003?q\u0015A\u0002)sK\u0012,g-C\u00021\u0003GQ1!a\bO\u0011!\t9#!\u0006A\u0002\u0005%\u0012A\u0001;t!\ri\u00151F\u0005\u0004\u0003[q%\u0001\u0002'p]\u001eDq!!\rp\t\u0003\n\u0019$A\u0004dQ\u0006tw-\u001a3\u0015\t\u0005U\u00121\b\t\u0004\u001b\u0006]\u0012bAA\u001d\u001d\n!QK\\5u\u0011!\ti$a\fA\u0002\u0005}\u0012!B3wK:$\bcA:\u0002B%\u0019\u00111\t;\u0003\u0019\u0019+\u0017\r^;sK\u00163XM\u001c;\t\u000f\u0005\u001d\u0003\u0001)A\u0005m\u00059\u0001/\u0019:b[N\u0004\u0003bBA&\u0001\u0011\u0005\u0013QJ\u0001\bKb,7-\u001e;f)\t\t)\u0004")
public class KafkaListenCommand
implements KafkaDataStoreCommand,
LazyLogging {
    private final String name;
    private final ListenParameters params;
    private final Logger logger;
    private volatile boolean bitmap$0;

    private Logger logger$lzycompute() {
        KafkaListenCommand kafkaListenCommand = this;
        synchronized (kafkaListenCommand) {
            if (!this.bitmap$0) {
                this.logger = LazyLogging.class.logger((LazyLogging)this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    @Override
    public Map<String, String> connection() {
        return KafkaDataStoreCommand$class.connection(this);
    }

    public <T> T withDataStore(Function1<KafkaDataStore, T> method) throws ParameterException {
        return (T)package.DataStoreCommand.class.withDataStore((package.DataStoreCommand)this, method);
    }

    public DataStore loadDataStore() throws ParameterException {
        return package.DataStoreCommand.class.loadDataStore((package.DataStoreCommand)this);
    }

    public Seq<package.Command> subCommands() {
        return package.Command.class.subCommands((package.Command)this);
    }

    public Option<ParameterException> validate() {
        return package.Command.class.validate((package.Command)this);
    }

    public void run() {
        package.Command.class.run((package.Command)this);
    }

    public String name() {
        return this.name;
    }

    @Override
    public ListenParameters params() {
        return this.params;
    }

    public void execute() {
        this.withDataStore((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ KafkaListenCommand $outer;

            public final void apply(KafkaDataStore ds) {
                SimpleFeatureType sft = ds.getSchema(this.$outer.params().featureName());
                if (sft == null) {
                    throw new ParameterException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Type ", " does not exist at path ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.params().featureName(), this.$outer.params().zkPath()})));
                }
                package.Command$.MODULE$.user().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Listening to '", "' ", " ..."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{sft.getTypeName(), SimpleFeatureTypes$.MODULE$.encodeType(sft)})));
                ds.getFeatureSource(sft.getTypeName()).addFeatureListener((FeatureListener)new OutFeatureListener());
                try {
                    while (true) {
                        Thread.sleep(1000L);
                    }
                }
                catch (InterruptedException interruptedException) {
                    return;
                }
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    public KafkaListenCommand() {
        package.Command.class.$init$((package.Command)this);
        package.DataStoreCommand.class.$init$((package.DataStoreCommand)this);
        KafkaDataStoreCommand$class.$init$(this);
        LazyLogging.class.$init$((LazyLogging)this);
        this.name = "listen";
        this.params = new ListenParameters();
    }

    @Parameters(commandDescription="Listen to a GeoMesa Kafka topic")
    public static class ListenParameters
    implements ConsumerDataStoreParams,
    RequiredTypeNameParam {
        @Parameter(names={"-f", "--feature-name"}, description="Simple Feature Type name on which to operate", required=true)
        private String featureName;
        @Parameter(names={"--num-consumers"}, description="Number of consumer threads used for reading from Kafka")
        private int numConsumers;
        @Parameter(names={"--from-beginning"}, description="Consume from the beginning or end of the topic")
        private boolean fromBeginning;
        @Parameter(names={"--read-back"}, description="Consume messages written within this time frame, e.g. '1 hour'", converter=ParameterConverters.DurationConverter.class)
        private Duration readBack;
        private final int replication;
        private final int partitions;
        @Parameter(names={"-b", "--brokers"}, description="Brokers (host:port, comma separated)", required=true)
        private String brokers;
        @Parameter(names={"-z", "--zookeepers"}, description="Zookeepers (host[:port], comma separated)", required=true)
        private String zookeepers;
        @Parameter(names={"-p", "--zkpath"}, description="Zookeeper path where feature schemas are saved")
        private String zkPath;

        public String featureName() {
            return this.featureName;
        }

        public void featureName_$eq(String x$1) {
            this.featureName = x$1;
        }

        @Override
        public int numConsumers() {
            return this.numConsumers;
        }

        @Override
        @TraitSetter
        public void numConsumers_$eq(int x$1) {
            this.numConsumers = x$1;
        }

        @Override
        public boolean fromBeginning() {
            return this.fromBeginning;
        }

        @Override
        @TraitSetter
        public void fromBeginning_$eq(boolean x$1) {
            this.fromBeginning = x$1;
        }

        @Override
        public Duration readBack() {
            return this.readBack;
        }

        @Override
        @TraitSetter
        public void readBack_$eq(Duration x$1) {
            this.readBack = x$1;
        }

        @Override
        public int replication() {
            return this.replication;
        }

        @Override
        public int partitions() {
            return this.partitions;
        }

        @Override
        public void org$locationtech$geomesa$kafka$tools$ConsumerDataStoreParams$_setter_$replication_$eq(int x$1) {
            this.replication = x$1;
        }

        @Override
        public void org$locationtech$geomesa$kafka$tools$ConsumerDataStoreParams$_setter_$partitions_$eq(int x$1) {
            this.partitions = x$1;
        }

        @Override
        public String brokers() {
            return this.brokers;
        }

        @Override
        @TraitSetter
        public void brokers_$eq(String x$1) {
            this.brokers = x$1;
        }

        @Override
        public String zookeepers() {
            return this.zookeepers;
        }

        @Override
        @TraitSetter
        public void zookeepers_$eq(String x$1) {
            this.zookeepers = x$1;
        }

        @Override
        public String zkPath() {
            return this.zkPath;
        }

        @Override
        @TraitSetter
        public void zkPath_$eq(String x$1) {
            this.zkPath = x$1;
        }

        public ListenParameters() {
            KafkaDataStoreParams$class.$init$(this);
            ConsumerDataStoreParams$class.$init$(this);
            RequiredTypeNameParam.class.$init$((RequiredTypeNameParam)this);
        }
    }

    public static class OutFeatureListener
    implements FeatureListener {
        private final DateTimeFormatter formatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneOffset.UTC);

        private DateTimeFormatter formatter() {
            return this.formatter;
        }

        private String format(long ts) {
            return ZonedDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC).format(this.formatter());
        }

        public void changed(FeatureEvent event) {
            String string;
            FeatureEvent featureEvent = event;
            if (featureEvent instanceof KafkaFeatureEvent.KafkaFeatureChanged) {
                KafkaFeatureEvent.KafkaFeatureChanged kafkaFeatureChanged = (KafkaFeatureEvent.KafkaFeatureChanged)featureEvent;
                string = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", " [add/update] ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.format(kafkaFeatureChanged.time()), DataUtilities.encodeFeature((SimpleFeature)kafkaFeatureChanged.feature())}));
            } else if (featureEvent instanceof KafkaFeatureEvent.KafkaFeatureRemoved) {
                KafkaFeatureEvent.KafkaFeatureRemoved kafkaFeatureRemoved = (KafkaFeatureEvent.KafkaFeatureRemoved)featureEvent;
                string = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", " [delete]     ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.format(kafkaFeatureRemoved.time()), Option$.MODULE$.apply((Object)kafkaFeatureRemoved.feature()).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply(SimpleFeature x$1) {
                        return DataUtilities.encodeFeature((SimpleFeature)x$1);
                    }
                }).getOrElse((Function0)new Serializable(this, kafkaFeatureRemoved){
                    public static final long serialVersionUID = 0L;
                    private final KafkaFeatureEvent.KafkaFeatureRemoved x3$1;

                    public final String apply() {
                        return this.x3$1.id();
                    }
                    {
                        this.x3$1 = x3$1;
                    }
                })}));
            } else if (featureEvent instanceof KafkaFeatureEvent.KafkaFeatureCleared) {
                KafkaFeatureEvent.KafkaFeatureCleared kafkaFeatureCleared = (KafkaFeatureEvent.KafkaFeatureCleared)featureEvent;
                string = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", " [clear]"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.format(kafkaFeatureCleared.time())}));
            } else {
                string = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unknown event ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{featureEvent}));
            }
            String msg = string;
            package.Command$.MODULE$.output().info(msg);
        }
    }
}

