/*
 * Decompiled with CFR 0.152.
 */
package org.atmosphere.kafka;

import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.util.AbstractBroadcasterProxy;
import org.atmosphere.util.ExecutorsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBroadcaster
extends AbstractBroadcasterProxy {
    private final Logger logger = LoggerFactory.getLogger(KafkaBroadcaster.class);
    public static final String PROPERTIES_FILE = "org.atmosphere.kafka.propertiesFile";
    private String topic;
    private KafkaProducer producer;
    private KafkaConsumer consumer;
    private final Serializer stringSerializer = new StringSerializer();
    private final Deserializer stringDeserializer = new StringDeserializer();
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public Broadcaster initialize(String id, URI uri, AtmosphereConfig config) {
        super.initialize(id, uri, config);
        this.topic = id.equals("/*") ? "atmosphere" : id.replaceAll("[^a-zA-Z0-9\\s]", "");
        this.producer = (KafkaProducer)config.properties().get("producer");
        HashSet<String> topics = (HashSet<String>)config.properties().get("topics");
        if (topics == null) {
            topics = new HashSet<String>();
            config.properties().put("topics", topics);
        }
        if (topics.isEmpty() || !topics.contains(this.topic)) {
            String load = config.getInitParameter(PROPERTIES_FILE, null);
            Properties props = new Properties();
            UUID uuid = UUID.randomUUID();
            String defaultGroupId = "atmosphere-consumer-" + Long.toHexString(uuid.getMostSignificantBits() ^ uuid.getLeastSignificantBits());
            props.put("group.id", defaultGroupId);
            props.put("bootstrap.servers", "127.0.0.1:9092");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            if (load != null) {
                try {
                    props.load(config.getServletContext().getResourceAsStream(load));
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            if (topics.isEmpty()) {
                this.producer = new KafkaProducer(props, this.stringSerializer, this.stringSerializer);
                config.properties().put("producer", this.producer);
            }
            this.consumer = new KafkaConsumer(props, this.stringDeserializer, this.stringDeserializer);
            topics.add(this.topic);
            this.startConsumer();
        }
        return this;
    }

    public synchronized void destroy() {
        this.closed.set(true);
        super.destroy();
    }

    void startConsumer() {
        this.consumer.subscribe(Arrays.asList(this.topic));
        ExecutorsFactory.getMessageDispatcher((AtmosphereConfig)this.config, (String)"kafka").execute(new Runnable(){

            @Override
            public void run() {
                while (!KafkaBroadcaster.this.closed.get()) {
                    ConsumerRecords records = KafkaBroadcaster.this.consumer.poll(1000L);
                    for (ConsumerRecord record : records) {
                        KafkaBroadcaster.this.broadcastReceivedMessage(record.value());
                    }
                }
                KafkaBroadcaster.this.consumer.close();
                ((Set)KafkaBroadcaster.this.config.properties().get("topics")).remove(KafkaBroadcaster.this.topic);
            }
        });
    }

    public void incomingBroadcast() {
    }

    public void outgoingBroadcast(Object message) {
        this.logger.trace("{} outgoingBroadcast {}", (Object)this.topic, message);
        this.producer.send(new ProducerRecord(this.topic, (Object)message.toString()));
    }
}

