/*
 * Decompiled with CFR 0.152.
 */
package com.googlecode.jmxtrans.model.output.kafka;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.googlecode.jmxtrans.model.Query;
import com.googlecode.jmxtrans.model.Result;
import com.googlecode.jmxtrans.model.Server;
import com.googlecode.jmxtrans.model.ValidationException;
import com.googlecode.jmxtrans.model.output.BaseOutputWriter;
import com.googlecode.jmxtrans.model.output.ResultSerializer;
import com.googlecode.jmxtrans.model.output.Settings;
import com.googlecode.jmxtrans.model.output.kafka.DefaultResultSerializer;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class KafkaWriter
extends BaseOutputWriter {
    private static final Logger log = LoggerFactory.getLogger(KafkaWriter.class);
    private static final String DEFAULT_ROOT_PREFIX = "servers";
    private final Producer<String, String> producer;
    private final Iterable<String> topics;
    private final ResultSerializer resultSerializer;

    @VisibleForTesting
    KafkaWriter(ImmutableList<String> typeNames, boolean booleanAsNumber, String rootPrefix, Boolean debugEnabled, String topics, Map<String, String> tags, Map<String, Object> settings, Producer<String, String> producer) {
        super(typeNames, booleanAsNumber, debugEnabled, settings);
        this.producer = producer;
        this.topics = Arrays.asList(Settings.getStringSetting(settings, (String)"topics", (String)"").split(","));
        String aRootPrefix = (String)this.firstNonNull(rootPrefix, (String)this.getSettings().get("rootPrefix"), DEFAULT_ROOT_PREFIX);
        Map aTags = (Map)this.firstNonNull(tags, (Map)this.getSettings().get("tags"), ImmutableMap.of());
        this.resultSerializer = new DefaultResultSerializer((List<String>)typeNames, booleanAsNumber, aRootPrefix, aTags, (List<String>)ImmutableList.of());
    }

    @JsonCreator
    public KafkaWriter(@JsonProperty(value="typeNames") ImmutableList<String> typeNames, @JsonProperty(value="booleanAsNumber") boolean booleanAsNumber, @JsonProperty(value="rootPrefix") String rootPrefix, @JsonProperty(value="debug") Boolean debugEnabled, @JsonProperty(value="topics") String topics, @JsonProperty(value="tags") Map<String, String> tags, @JsonProperty(value="settings") Map<String, Object> settings) {
        this(typeNames, booleanAsNumber, rootPrefix, debugEnabled, topics, tags, settings, KafkaWriter.createProducer(settings));
    }

    private static Producer<String, String> createProducer(Map<String, Object> settings) {
        Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty("bootstrap.servers", Settings.getStringSetting(settings, (String)"bootstrap.servers", null));
        kafkaProperties.setProperty("key.serializer", Settings.getStringSetting(settings, (String)"key.serializer", (String)StringSerializer.class.getName()));
        kafkaProperties.setProperty("value.serializer", Settings.getStringSetting(settings, (String)"value.serializer", (String)StringSerializer.class.getName()));
        return new KafkaProducer(kafkaProperties);
    }

    public void validateSetup(Server server, Query query) throws ValidationException {
    }

    protected void internalWrite(Server server, Query query, ImmutableList<Result> results) throws Exception {
        for (Result result : results) {
            log.debug("Query result: [{}]", (Object)result);
            String message = this.resultSerializer.serialize(server, query, result);
            for (String topic : this.topics) {
                log.debug("Topic: [{}] ; Kafka Message: [{}]", (Object)topic, (Object)message);
                this.producer.send(new ProducerRecord(topic, (Object)message));
            }
        }
    }

    public void close() {
        this.producer.close();
    }

    @SuppressFBWarnings(justification="generated code")
    Producer<String, String> getProducer() {
        return this.producer;
    }
}

