/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.nephron.testing.benchmark;

import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.opennms.nephron.testing.flowgen.FlowDocuments;
import org.opennms.nephron.testing.flowgen.FlowGenOptions;
import org.opennms.nephron.testing.flowgen.Limiter;
import org.opennms.nephron.testing.flowgen.SourceConfig;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaFlowIngester {
    private static Logger LOG = LoggerFactory.getLogger(KafkaFlowIngester.class);

    public static Producer<String, byte[]> createProducer(FlowGenOptions options) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", options.getBootstrapServers());
        props.put("client.id", "KafkaExampleProducer");
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        return new KafkaProducer(props);
    }

    public static long sendRecordsToKafka(final FlowGenOptions options) {
        final Producer<String, byte[]> producer = KafkaFlowIngester.createProducer(options);
        final SourceConfig sourceConfig = SourceConfig.of(options, null);
        Stream<FlowDocument> stream = FlowDocuments.stream(sourceConfig);
        final Duration logPeriod = Duration.standardSeconds((long)10L);
        final Instant start = Instant.now();
        final AtomicLong total = new AtomicLong(0L);
        stream.forEach(new Consumer<FlowDocument>(){
            Instant nextLog;
            long lastTotal;
            Limiter limiter;
            {
                this.nextLog = start.plus((ReadableDuration)logPeriod);
                this.lastTotal = 0L;
                this.limiter = Limiter.of(sourceConfig.flowsPerSecond);
            }

            @Override
            public void accept(FlowDocument flowDocument) {
                while (!this.limiter.check(1L)) {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                long lTotal = total.incrementAndGet();
                producer.send(new ProducerRecord(options.getFlowSourceTopic(), (Object)flowDocument.toByteArray()));
                if (Instant.now().isAfter((ReadableInstant)this.nextLog)) {
                    LOG.info("duration: " + new Duration((ReadableInstant)start, (ReadableInstant)this.nextLog) + "; total: " + lTotal + "; current rate (1/s): " + String.format("%.2f", (double)(lTotal - this.lastTotal) / (double)logPeriod.getStandardSeconds()));
                    this.nextLog = this.nextLog.plus((ReadableDuration)logPeriod);
                    this.lastTotal = lTotal;
                }
            }
        });
        return total.get();
    }

    public static void main(String[] args) {
        FlowGenOptions options = (FlowGenOptions)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(FlowGenOptions.class);
        Instant start = Instant.now();
        long total = KafkaFlowIngester.sendRecordsToKafka(options);
        LOG.info("Output " + total + " flows in " + new Duration((ReadableInstant)start, (ReadableInstant)Instant.now()));
    }
}

