/*
 * Decompiled with CFR 0.152.
 */
package org.atmosphere.kafka;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.atmosphere.config.managed.ManagedAtmosphereHandler;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.util.AbstractBroadcasterProxy;
import org.atmosphere.util.ExecutorsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBroadcaster
extends AbstractBroadcasterProxy {
    private final Logger logger = LoggerFactory.getLogger(KafkaBroadcaster.class);
    public static final String PROPERTIES_FILE = "org.atmosphere.kafka.propertiesFile";
    private String topic;
    private KafkaProducer producer;
    private ConsumerConnector consumer;
    private final Serializer stringSerializer = new StringSerializer();
    private Map<String, Integer> topicCountMap;
    private Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap;

    public Broadcaster initialize(String id, URI uri, final AtmosphereConfig config) {
        super.initialize(id, uri, config);
        this.topic = id.equals("/*") ? "atmosphere" : id.replaceAll("[^a-zA-Z0-9\\s]", "");
        this.producer = (KafkaProducer)config.properties().get(KafkaProducer.class.getName());
        this.consumer = (ConsumerConnector)config.properties().get(ConsumerConnector.class.getName());
        this.topicCountMap = (Map)config.properties().get("topicCountMap");
        if (this.producer == null) {
            String load = config.getInitParameter(PROPERTIES_FILE, null);
            Properties props = new Properties();
            if (load == null) {
                props.put("bootstrap.servers", "10.0.1.10:9092");
                props.put("zk.connect", "127.0.0.1:9092");
                props.put("group.id", "kafka.atmosphere");
                props.put("partition.assignment.strategy", "roundrobin");
                props.put("zookeeper.connect", "localhost:2181");
            } else {
                try {
                    props.load(config.getServletContext().getResourceAsStream(load));
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            this.producer = new KafkaProducer(props, this.stringSerializer, this.stringSerializer);
            this.consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)new ConsumerConfig(props));
            this.topicCountMap = new HashMap<String, Integer>();
            config.properties().put("producer", this.producer);
            config.properties().put(ConsumerConnector.class.getName(), this.consumer);
            config.properties().put("topicCountMap", this.topicCountMap);
        }
        this.topicCountMap.put(this.topic, new Integer(1));
        config.startupHook(new AtmosphereConfig.StartupHook(){

            public void started(AtmosphereFramework framework) {
                if (config.properties().get("started") != null) {
                    return;
                }
                config.properties().put("started", "true");
                KafkaBroadcaster.this.consumerMap = KafkaBroadcaster.this.consumer.createMessageStreams(KafkaBroadcaster.this.topicCountMap);
                for (final String t : KafkaBroadcaster.this.topicCountMap.keySet()) {
                    ExecutorsFactory.getMessageDispatcher((AtmosphereConfig)config, (String)"kafka").execute(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                KafkaStream stream = (KafkaStream)((List)KafkaBroadcaster.this.consumerMap.get(t)).get(0);
                                ConsumerIterator it = stream.iterator();
                                while (it.hasNext()) {
                                    String message = new String((byte[])it.next().message());
                                    KafkaBroadcaster.this.logger.trace("{} incomingBroadcast {}", (Object)t, (Object)message);
                                    KafkaBroadcaster.this.broadcastReceivedMessage(message);
                                }
                            }
                            catch (Exception ex) {
                                if (InterruptedException.class.isAssignableFrom(ex.getClass())) {
                                    KafkaBroadcaster.this.logger.trace("", (Throwable)ex);
                                }
                                KafkaBroadcaster.this.logger.warn("", (Throwable)ex);
                            }
                        }
                    });
                }
            }
        });
        return this;
    }

    public void incomingBroadcast() {
    }

    public void outgoingBroadcast(Object message) {
        this.logger.trace("{} outgoingBroadcast {}", (Object)this.topic, message);
        if (ManagedAtmosphereHandler.Managed.class.isAssignableFrom(message.getClass())) {
            message = ((ManagedAtmosphereHandler.Managed)ManagedAtmosphereHandler.Managed.class.cast(message)).object();
        }
        this.producer.send(new ProducerRecord(this.topic, message));
    }
}

