/*
 * Decompiled with CFR 0.152.
 */
package pl.touk.nussknacker.engine.kafka;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import pl.touk.nussknacker.engine.api.namespaces.KafkaUsageKey$;
import pl.touk.nussknacker.engine.api.namespaces.NamingContext;
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies;
import pl.touk.nussknacker.engine.kafka.KafkaConfig;
import pl.touk.nussknacker.engine.kafka.PreparedKafkaTopic;
import pl.touk.nussknacker.engine.util.ThreadUtils$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.IterableViewLike;
import scala.collection.JavaConverters$;
import scala.collection.SeqLike;
import scala.collection.SeqView$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

public final class KafkaUtils$
implements LazyLogging {
    public static KafkaUtils$ MODULE$;
    private final int defaultTimeoutMillis;
    private final NamingContext KafkaTopicUsageKey;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    static {
        new KafkaUtils$();
    }

    private Logger logger$lzycompute() {
        KafkaUtils$ kafkaUtils$ = this;
        synchronized (kafkaUtils$) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$(this);
                this.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        return !this.bitmap$trans$0 ? this.logger$lzycompute() : this.logger;
    }

    public int defaultTimeoutMillis() {
        return this.defaultTimeoutMillis;
    }

    public final NamingContext KafkaTopicUsageKey() {
        return this.KafkaTopicUsageKey;
    }

    public void setClientId(Properties props, String id) {
        props.setProperty("client.id", this.sanitizeClientId(id));
    }

    public PreparedKafkaTopic prepareKafkaTopic(String topic, ProcessObjectDependencies processObjectDependencies) {
        return new PreparedKafkaTopic(topic, processObjectDependencies.objectNaming().prepareName(topic, processObjectDependencies.config(), this.KafkaTopicUsageKey()));
    }

    public String sanitizeClientId(String originalId) {
        return originalId.replaceAll("[^a-zA-Z0-9\\._\\-]", "_");
    }

    public void setToLatestOffsetIfNeeded(KafkaConfig config, String topic, String consumerGroupId) {
        block0: {
            boolean setToLatestOffset = config.forceLatestRead().contains((Object)BoxesRunTime.boxToBoolean((boolean)true));
            if (!setToLatestOffset) break block0;
            this.setOffsetToLatest(topic, consumerGroupId, config);
        }
    }

    public void setOffsetToLatest(String topic, String groupId, KafkaConfig config) {
        BoxedUnit boxedUnit;
        long timeoutMillis = this.readTimeout(config);
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info("Setting offset to latest for topic: {}, groupId: {}", new String[]{topic, groupId});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        Future consumerAfterWork = Future$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> MODULE$.doWithTempKafkaConsumer(config, (Option<String>)new Some((Object)groupId), (Function1 & Serializable & scala.Serializable)consumer -> {
            KafkaUtils$.MODULE$.setOffsetToLatest(topic, consumer);
            return BoxedUnit.UNIT;
        }), ExecutionContext.Implicits$.MODULE$.global());
        Await$.MODULE$.result((Awaitable)consumerAfterWork, (Duration)Duration$.MODULE$.apply(timeoutMillis, TimeUnit.MILLISECONDS));
    }

    public Properties toProperties(KafkaConfig config, Option<String> groupId) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", config.kafkaAddress());
        props.setProperty("auto.offset.reset", "earliest");
        groupId.foreach((Function1 & Serializable & scala.Serializable)x$1 -> props.setProperty("group.id", (String)x$1));
        return this.withPropertiesFromConfig(props, config);
    }

    public Properties toProducerProperties(KafkaConfig config, String clientId) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", config.kafkaAddress());
        props.setProperty("key.serializer", ByteArraySerializer.class.getCanonicalName());
        props.setProperty("value.serializer", ByteArraySerializer.class.getCanonicalName());
        props.setProperty("acks", "all");
        props.setProperty("retries", "0");
        props.setProperty("batch.size", "16384");
        props.setProperty("linger.ms", "1");
        props.setProperty("buffer.memory", "33554432");
        this.setClientId(props, clientId);
        return this.withPropertiesFromConfig(props, config);
    }

    public Properties withPropertiesFromConfig(Properties props, KafkaConfig kafkaConfig) {
        ((IterableLike)kafkaConfig.kafkaProperties().getOrElse((Function0 & Serializable & scala.Serializable)() -> Predef$.MODULE$.Map().empty())).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            String k = (String)tuple2._1();
            String v = (String)tuple2._2();
            Object object = props.setProperty(k, v);
            return object;
        });
        return props;
    }

    /*
     * WARNING - void declaration
     */
    private Properties toPropertiesForTempConsumer(KafkaConfig config, Option<String> group) {
        void var3_3;
        Properties props = this.toProperties(config, group);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.setProperty("session.timeout.ms", ((Object)BoxesRunTime.boxToLong((long)this.readTimeout(config))).toString());
        return var3_3;
    }

    public List<ConsumerRecord<byte[], byte[]>> readLastMessages(String topic, int size, KafkaConfig config) {
        return (List)this.doWithTempKafkaConsumer(config, (Option<String>)None$.MODULE$, (Function1 & Serializable & scala.Serializable)consumer -> {
            List list;
            try {
                list = ((IterableViewLike)((SeqLike)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(consumer.partitionsFor(topic)).asScala()).map((Function1 & Serializable & scala.Serializable)no -> new TopicPartition(topic, no.partition()), Buffer$.MODULE$.canBuildFrom())).view().flatMap((Function1 & Serializable & scala.Serializable)tp -> {
                    java.util.List<TopicPartition> partitions = Collections.singletonList(tp);
                    consumer.assign(partitions);
                    consumer.seekToEnd(partitions);
                    long lastOffset = consumer.position((TopicPartition)tp);
                    long offsetToSearch = Math.max(0L, lastOffset - (long)size);
                    consumer.seek((TopicPartition)tp, offsetToSearch);
                    ArrayBuffer result = new ArrayBuffer(size);
                    result.appendAll((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(consumer.poll(java.time.Duration.ofMillis(100L)).records((TopicPartition)tp)).asScala());
                    if (result.isEmpty()) {
                        consumer.seekToBeginning(partitions);
                    }
                    long currentOffset = consumer.position((TopicPartition)tp);
                    while (result.size() < size && currentOffset < lastOffset) {
                        result.appendAll((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(consumer.poll(java.time.Duration.ofMillis(100L)).records((TopicPartition)tp)).asScala());
                        currentOffset = consumer.position((TopicPartition)tp);
                    }
                    consumer.unsubscribe();
                    return (ArrayBuffer)result.take(size);
                }, SeqView$.MODULE$.canBuildFrom())).take(size).toList();
            }
            finally {
                consumer.unsubscribe();
            }
            return list;
        });
    }

    private <T> T doWithTempKafkaConsumer(KafkaConfig config, Option<String> groupId, Function1<KafkaConsumer<byte[], byte[]>, T> fun) {
        return ThreadUtils$.MODULE$.withThisAsContextClassLoader(KafkaClient.class.getClassLoader(), (Function0 & Serializable & scala.Serializable)() -> {
            Object object;
            try (KafkaConsumer consumer = new KafkaConsumer(MODULE$.toPropertiesForTempConsumer(config, groupId));){
                object = fun.apply(consumer);
            }
            return object;
        });
    }

    private long readTimeout(KafkaConfig config) {
        return BoxesRunTime.unboxToLong((Object)config.kafkaProperties().flatMap((Function1 & Serializable & scala.Serializable)props -> props.get((Object)"session.timeout.ms").map((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToLong((long)KafkaUtils$.$anonfun$readTimeout$2(x$2)))).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> MODULE$.defaultTimeoutMillis()));
    }

    private void setOffsetToLatest(String topic, KafkaConsumer<?, ?> consumer) {
        Buffer partitions = (Buffer)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(consumer.partitionsFor(topic)).asScala()).map((Function1 & Serializable & scala.Serializable)partition -> new TopicPartition(partition.topic(), partition.partition()), Buffer$.MODULE$.canBuildFrom());
        consumer.assign((Collection)JavaConverters$.MODULE$.bufferAsJavaListConverter(partitions).asJava());
        consumer.seekToEnd((Collection)JavaConverters$.MODULE$.bufferAsJavaListConverter(partitions).asJava());
        partitions.foreach((Function1 & Serializable & scala.Serializable)p -> BoxesRunTime.boxToLong((long)consumer.position(p)));
        consumer.commitSync();
    }

    public Future<RecordMetadata> sendToKafkaWithTempProducer(String topic, byte[] key, byte[] value, KafkaConfig kafkaConfig) {
        Future<RecordMetadata> future;
        try (KafkaProducer<byte[], byte[]> producer = null;){
            producer = this.createProducer(kafkaConfig, new StringBuilder(5).append("temp-").append(topic).toString());
            future = this.sendToKafka(topic, key, value, producer);
        }
        return future;
    }

    public <K, V> Future<RecordMetadata> sendToKafka(String topic, K key, V value, KafkaProducer<K, V> producer) {
        Promise promise = Promise$.MODULE$.apply();
        producer.send(new ProducerRecord<K, V>(topic, key, value), this.producerCallback((Promise<RecordMetadata>)promise));
        return promise.future();
    }

    public KafkaProducer<byte[], byte[]> createProducer(KafkaConfig kafkaConfig, String clientId) {
        return new KafkaProducer<byte[], byte[]>(this.toProducerProperties(kafkaConfig, clientId));
    }

    public Callback producerCallback(Promise<RecordMetadata> promise) {
        return new Callback(promise){
            private final Promise promise$1;

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                Success result = exception == null ? new Success((Object)metadata) : new Failure((Throwable)exception);
                this.promise$1.complete((Try)result);
            }
            {
                this.promise$1 = promise$1;
            }
        };
    }

    public static final /* synthetic */ long $anonfun$readTimeout$2(String x$2) {
        return new StringOps(Predef$.MODULE$.augmentString(x$2)).toLong();
    }

    private KafkaUtils$() {
        MODULE$ = this;
        LazyLogging.$init$(this);
        this.defaultTimeoutMillis = 10000;
        this.KafkaTopicUsageKey = new NamingContext(KafkaUsageKey$.MODULE$);
    }
}

