/*
 * Decompiled with CFR 0.152.
 */
package com.googlecode.jmxtrans.model.output.kafka;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closer;
import com.googlecode.jmxtrans.model.Query;
import com.googlecode.jmxtrans.model.Result;
import com.googlecode.jmxtrans.model.Server;
import com.googlecode.jmxtrans.model.ValidationException;
import com.googlecode.jmxtrans.model.naming.KeyUtils;
import com.googlecode.jmxtrans.model.output.BaseOutputWriter;
import com.googlecode.jmxtrans.model.output.Settings;
import com.googlecode.jmxtrans.util.NumberUtils;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.concurrent.NotThreadSafe;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class KafkaWriter
extends BaseOutputWriter {
    private static final Logger log = LoggerFactory.getLogger(KafkaWriter.class);
    private static final String DEFAULT_ROOT_PREFIX = "servers";
    private final JsonFactory jsonFactory;
    private Producer<String, String> producer;
    private final Iterable<String> topics;
    private final String rootPrefix;
    private final ImmutableMap<String, String> tags;

    @JsonCreator
    public KafkaWriter(@JsonProperty(value="typeNames") ImmutableList<String> typeNames, @JsonProperty(value="booleanAsNumber") boolean booleanAsNumber, @JsonProperty(value="rootPrefix") String rootPrefix, @JsonProperty(value="debug") Boolean debugEnabled, @JsonProperty(value="topics") String topics, @JsonProperty(value="tags") Map<String, String> tags, @JsonProperty(value="settings") Map<String, Object> settings) {
        super(typeNames, booleanAsNumber, debugEnabled, settings);
        this.rootPrefix = (String)this.firstNonNull(rootPrefix, (String)this.getSettings().get("rootPrefix"), DEFAULT_ROOT_PREFIX);
        Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty("metadata.broker.list", Settings.getStringSetting(settings, (String)"metadata.broker.list", null));
        kafkaProperties.setProperty("zk.connect", Settings.getStringSetting(settings, (String)"zk.connect", null));
        kafkaProperties.setProperty("serializer.class", Settings.getStringSetting(settings, (String)"serializer.class", null));
        this.producer = new Producer(new ProducerConfig(kafkaProperties));
        this.topics = Arrays.asList(Settings.getStringSetting(settings, (String)"topics", (String)"").split(","));
        this.tags = ImmutableMap.copyOf((Map)((Map)this.firstNonNull(tags, (Map)this.getSettings().get("tags"), ImmutableMap.of())));
        this.jsonFactory = new JsonFactory();
    }

    public void validateSetup(Server server, Query query) throws ValidationException {
    }

    protected void internalWrite(Server server, Query query, ImmutableList<Result> results) throws Exception {
        ImmutableList typeNames = this.getTypeNames();
        for (Result result : results) {
            log.debug("Query result: [{}]", (Object)result);
            ImmutableMap resultValues = result.getValues();
            for (Map.Entry<String, Object> values : resultValues.entrySet()) {
                Object value = values.getValue();
                if (NumberUtils.isNumeric(value)) {
                    String message = this.createJsonMessage(server, query, (List<String>)typeNames, result, values, value);
                    for (String topic : this.topics) {
                        log.debug("Topic: [{}] ; Kafka Message: [{}]", (Object)topic, (Object)message);
                        this.producer.send(new KeyedMessage(topic, (Object)message));
                    }
                    continue;
                }
                log.warn("Unable to submit non-numeric value to Kafka: [{}] from result [{}]", value, (Object)result);
            }
        }
    }

    private String createJsonMessage(Server server, Query query, List<String> typeNames, Result result, Map.Entry<String, Object> values, Object value) throws IOException {
        String keyString = KeyUtils.getKeyString((Server)server, (Query)query, (Result)result, values, typeNames, (String)this.rootPrefix);
        String cleanKeyString = keyString.replaceAll("[()]", "_");
        try (Closer closer = Closer.create();){
            ByteArrayOutputStream out = (ByteArrayOutputStream)closer.register((Closeable)new ByteArrayOutputStream());
            JsonGenerator generator = (JsonGenerator)closer.register((Closeable)this.jsonFactory.createGenerator((OutputStream)out, JsonEncoding.UTF8));
            generator.writeStartObject();
            generator.writeStringField("keyspace", cleanKeyString);
            generator.writeStringField("value", value.toString());
            generator.writeNumberField("timestamp", result.getEpoch() / 1000L);
            generator.writeObjectFieldStart("tags");
            for (Map.Entry tag : this.tags.entrySet()) {
                generator.writeStringField((String)tag.getKey(), (String)tag.getValue());
            }
            generator.writeEndObject();
            generator.writeEndObject();
            generator.close();
            Object object = out.toString("UTF-8");
            return object;
        }
    }

    @VisibleForTesting
    void setProducer(Producer<String, String> producer) {
        this.producer = producer;
    }
}

