/*
 * Decompiled with CFR 0.152.
 */
package pl.touk.nussknacker.engine.kafka;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import pl.touk.nussknacker.engine.kafka.KafkaConfig;
import pl.touk.nussknacker.engine.kafka.KafkaEspUtils$;
import pl.touk.nussknacker.engine.util.ThreadUtils$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IndexedSeqOptimized;
import scala.collection.IterableLike;
import scala.collection.IterableViewLike;
import scala.collection.JavaConversions$;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.collection.SeqView$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

public final class KafkaEspUtils$
implements LazyLogging {
    public static final KafkaEspUtils$ MODULE$;
    private final int defaultTimeoutMillis;
    private final Logger logger;
    private volatile boolean bitmap$0;

    static {
        new KafkaEspUtils$();
    }

    private Logger logger$lzycompute() {
        KafkaEspUtils$ kafkaEspUtils$ = this;
        synchronized (kafkaEspUtils$) {
            if (!this.bitmap$0) {
                this.logger = LazyLogging.class.logger((LazyLogging)this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    public int defaultTimeoutMillis() {
        return this.defaultTimeoutMillis;
    }

    public void setToLatestOffsetIfNeeded(KafkaConfig config, String topic, String consumerGroupId) {
        boolean setToLatestOffset = config.kafkaEspProperties().flatMap((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Option<String> apply(scala.collection.immutable.Map<String, String> x$1) {
                return x$1.get((Object)"forceLatestRead");
            }
        }).exists((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(String x$1) {
                return Boolean.parseBoolean(x$1);
            }
        });
        if (setToLatestOffset) {
            this.setOffsetToLatest(topic, consumerGroupId, config);
        }
    }

    public void setOffsetToLatest(String topic, String groupId, KafkaConfig config) {
        BoxedUnit boxedUnit;
        long timeoutMillis = this.readTimeout(config);
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Setting offset to latest for topic: ", ", groupId: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topic, groupId})));
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        Future consumerAfterWork = Future$.MODULE$.apply((Function0)new Serializable(topic, groupId, config){
            public static final long serialVersionUID = 0L;
            public final String topic$1;
            private final String groupId$1;
            private final KafkaConfig config$1;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                KafkaEspUtils$.MODULE$.pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$doWithTempKafkaConsumer(this.config$1, (Option<String>)new Some((Object)this.groupId$1), new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ anonfun.1 $outer;

                    public final void apply(KafkaConsumer<byte[], byte[]> consumer) {
                        KafkaEspUtils$.MODULE$.pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$setOffsetToLatest(this.$outer.topic$1, consumer);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
            }
            {
                this.topic$1 = topic$1;
                this.groupId$1 = groupId$1;
                this.config$1 = config$1;
            }
        }, (ExecutionContext)ExecutionContext.Implicits$.MODULE$.global());
        Await$.MODULE$.result((Awaitable)consumerAfterWork, (Duration)Duration$.MODULE$.apply(timeoutMillis, TimeUnit.MILLISECONDS));
    }

    /*
     * WARNING - void declaration
     */
    public Properties toProperties(KafkaConfig config, Option<String> groupId) {
        void var3_3;
        Properties props = new Properties();
        props.setProperty("zookeeper.connect", config.zkAddress());
        props.setProperty("bootstrap.servers", config.kafkaAddress());
        props.setProperty("auto.offset.reset", "earliest");
        groupId.foreach((Function1)new Serializable(props){
            public static final long serialVersionUID = 0L;
            private final Properties props$1;

            public final Object apply(String x$2) {
                return this.props$1.setProperty("group.id", x$2);
            }
            {
                this.props$1 = props$1;
            }
        });
        config.kafkaProperties().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Map<String, String> apply(scala.collection.immutable.Map<String, String> x$3) {
                return (Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(x$3).asJava();
            }
        }).foreach((Function1)new Serializable(props){
            public static final long serialVersionUID = 0L;
            private final Properties props$1;

            public final void apply(Map<?, ?> x$1) {
                this.props$1.putAll(x$1);
            }
            {
                this.props$1 = props$1;
            }
        });
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public Properties toProducerProperties(KafkaConfig config) {
        void var2_2;
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", config.kafkaAddress());
        props.setProperty("key.serializer", ByteArraySerializer.class.getCanonicalName());
        props.setProperty("value.serializer", ByteArraySerializer.class.getCanonicalName());
        props.setProperty("acks", "all");
        props.setProperty("retries", "0");
        props.setProperty("batch.size", "16384");
        props.setProperty("linger.ms", "1");
        props.setProperty("buffer.memory", "33554432");
        ((IterableLike)config.kafkaProperties().getOrElse((Function0)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final scala.collection.immutable.Map<String, Nothing$> apply() {
                return Predef$.MODULE$.Map().empty();
            }
        })).foreach((Function1)new Serializable(props){
            public static final long serialVersionUID = 0L;
            private final Properties props$2;

            public final Object apply(Tuple2<String, String> x0$1) {
                Tuple2<String, String> tuple2 = x0$1;
                if (tuple2 != null) {
                    String k = (String)tuple2._1();
                    String v = (String)tuple2._2();
                    Object object = this.props$2.setProperty(k, v);
                    return object;
                }
                throw new MatchError(tuple2);
            }
            {
                this.props$2 = props$2;
            }
        });
        return var2_2;
    }

    /*
     * WARNING - void declaration
     */
    public Properties pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$toPropertiesForTempConsumer(KafkaConfig config, Option<String> group) {
        void var3_3;
        Properties props = this.toProperties(config, group);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.setProperty("session.timeout.ms", ((Object)BoxesRunTime.boxToLong((long)this.readTimeout(config))).toString());
        return var3_3;
    }

    public List<byte[]> readLastMessages(String topic, int size, KafkaConfig config) {
        return (List)this.pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$doWithTempKafkaConsumer(config, (Option<String>)None$.MODULE$, (Function1)new Serializable(topic, size){
            public static final long serialVersionUID = 0L;
            public final String topic$2;
            public final int size$1;

            public final List<byte[]> apply(KafkaConsumer<byte[], byte[]> consumer) {
                try {
                    return ((IterableViewLike)((SeqLike)JavaConversions$.MODULE$.asScalaBuffer(consumer.partitionsFor(this.topic$2)).map((Function1)new Serializable(this){
                        public static final long serialVersionUID = 0L;
                        private final /* synthetic */ anonfun.readLastMessages.1 $outer;

                        public final TopicPartition apply(PartitionInfo no) {
                            return new TopicPartition(this.$outer.topic$2, no.partition());
                        }
                        {
                            if ($outer == null) {
                                throw null;
                            }
                            this.$outer = $outer;
                        }
                    }, Buffer$.MODULE$.canBuildFrom())).view().flatMap((Function1)new Serializable(this, consumer){
                        public static final long serialVersionUID = 0L;
                        private final /* synthetic */ anonfun.readLastMessages.1 $outer;
                        private final KafkaConsumer consumer$2;

                        public final ArrayBuffer<byte[]> apply(TopicPartition tp) {
                            java.util.List<TopicPartition> partitions = Collections.singletonList(tp);
                            this.consumer$2.assign(partitions);
                            this.consumer$2.seekToEnd(partitions);
                            long lastOffset = this.consumer$2.position(tp);
                            long offsetToSearch = Math.max(0L, lastOffset - (long)this.$outer.size$1);
                            this.consumer$2.seek(tp, offsetToSearch);
                            ArrayBuffer result = new ArrayBuffer(this.$outer.size$1);
                            JavaConversions$.MODULE$.bufferAsJavaList((Buffer)result).addAll(this.consumer$2.poll(100L).records(tp));
                            if (result.isEmpty()) {
                                this.consumer$2.seekToBeginning(partitions);
                            }
                            long currentOffset = this.consumer$2.position(tp);
                            while (JavaConversions$.MODULE$.bufferAsJavaList((Buffer)result).size() < this.$outer.size$1 && currentOffset < lastOffset) {
                                result.appendAll((TraversableOnce)JavaConversions$.MODULE$.asScalaBuffer(this.consumer$2.poll(100L).records(tp)));
                                currentOffset = this.consumer$2.position(tp);
                            }
                            this.consumer$2.unsubscribe();
                            return (ArrayBuffer)((IndexedSeqOptimized)result.map((Function1)new Serializable(this){
                                public static final long serialVersionUID = 0L;

                                public final byte[] apply(ConsumerRecord<byte[], byte[]> x$4) {
                                    return (byte[])x$4.value();
                                }
                            }, ArrayBuffer$.MODULE$.canBuildFrom())).take(this.$outer.size$1);
                        }
                        {
                            if ($outer == null) {
                                throw null;
                            }
                            this.$outer = $outer;
                            this.consumer$2 = consumer$2;
                        }
                    }, SeqView$.MODULE$.canBuildFrom())).take(this.size$1).toList();
                }
                finally {
                    consumer.unsubscribe();
                }
            }
            {
                this.topic$2 = topic$2;
                this.size$1 = size$1;
            }
        });
    }

    public <T> T pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$doWithTempKafkaConsumer(KafkaConfig config, Option<String> groupId, Function1<KafkaConsumer<byte[], byte[]>, T> fun) {
        return (T)ThreadUtils$.MODULE$.withThisAsContextClassLoader(KafkaClient.class.getClassLoader(), (Function0)new Serializable(config, groupId, fun){
            public static final long serialVersionUID = 0L;
            private final KafkaConfig config$2;
            private final Option groupId$2;
            private final Function1 fun$1;

            /*
             * WARNING - void declaration
             */
            public final T apply() {
                Object object;
                KafkaConsumer consumer = new KafkaConsumer(KafkaEspUtils$.MODULE$.pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$toPropertiesForTempConsumer(this.config$2, (Option<String>)this.groupId$2));
                try {
                    object = this.fun$1.apply((Object)consumer);
                }
                catch (Throwable throwable) {
                    void var1_1;
                    var1_1.close();
                    throw throwable;
                }
                consumer.close();
                return (T)object;
            }
            {
                this.config$2 = config$2;
                this.groupId$2 = groupId$2;
                this.fun$1 = fun$1;
            }
        });
    }

    private long readTimeout(KafkaConfig config) {
        return BoxesRunTime.unboxToLong((Object)config.kafkaProperties().flatMap((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Option<Object> apply(scala.collection.immutable.Map<String, String> props) {
                return props.get((Object)"session.timeout.ms").map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final long apply(String x$5) {
                        return new StringOps(Predef$.MODULE$.augmentString(x$5)).toLong();
                    }
                });
            }
        }).getOrElse((Function0)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final long apply() {
                return this.apply$mcJ$sp();
            }

            public long apply$mcJ$sp() {
                return KafkaEspUtils$.MODULE$.defaultTimeoutMillis();
            }
        }));
    }

    public void pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$setOffsetToLatest(String topic, KafkaConsumer<?, ?> consumer) {
        Buffer partitions = (Buffer)JavaConversions$.MODULE$.asScalaBuffer(consumer.partitionsFor(topic)).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final TopicPartition apply(PartitionInfo partition) {
                return new TopicPartition(partition.topic(), partition.partition());
            }
        }, Buffer$.MODULE$.canBuildFrom());
        consumer.assign((Collection)JavaConversions$.MODULE$.bufferAsJavaList(partitions));
        consumer.seekToEnd((Collection)JavaConversions$.MODULE$.bufferAsJavaList(partitions));
        partitions.foreach((Function1)new Serializable(consumer){
            public static final long serialVersionUID = 0L;
            private final KafkaConsumer consumer$1;

            public final long apply(TopicPartition p) {
                return this.consumer$1.position(p);
            }
            {
                this.consumer$1 = consumer$1;
            }
        });
        consumer.commitSync();
    }

    public Future<RecordMetadata> sendToKafkaWithTempProducer(String topic, byte[] key, byte[] value, KafkaConfig kafkaConfig) {
        try (KafkaProducer<byte[], byte[]> producer = null;){
            producer = this.createProducer(kafkaConfig);
            return this.sendToKafka(topic, key, value, producer);
        }
    }

    public <K, V> Future<RecordMetadata> sendToKafka(String topic, K key, V value, KafkaProducer<K, V> producer) {
        Promise promise = Promise$.MODULE$.apply();
        producer.send(new ProducerRecord(topic, key, value), this.producerCallback((Promise<RecordMetadata>)promise));
        return promise.future();
    }

    public KafkaProducer<byte[], byte[]> createProducer(KafkaConfig kafkaConfig) {
        return new KafkaProducer(this.toProducerProperties(kafkaConfig));
    }

    public Callback producerCallback(Promise<RecordMetadata> promise) {
        return new Callback(promise){
            private final Promise promise$1;

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                Success result = exception == null ? new Success((Object)metadata) : new Failure((Throwable)exception);
                this.promise$1.complete((Try)result);
            }
            {
                this.promise$1 = promise$1;
            }
        };
    }

    private KafkaEspUtils$() {
        MODULE$ = this;
        LazyLogging.class.$init$((LazyLogging)this);
        this.defaultTimeoutMillis = 10000;
    }
}

