/*
 * Decompiled with CFR 0.152.
 */
package com.jdon.async;

import com.jdon.annotation.model.Send;
import com.jdon.async.disruptor.DisruptorFactory;
import com.jdon.async.disruptor.EventDisruptor;
import com.jdon.async.future.EventResultFuture;
import com.jdon.async.future.FutureDirector;
import com.jdon.async.future.FutureListener;
import com.jdon.container.pico.Startable;
import com.jdon.domain.message.DomainMessage;
import com.jdon.util.Debug;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class EventMessageFirer
implements Startable {
    public static final String module = EventMessageFirer.class.getName();
    private ScheduledExecutorService scheduExecStatic = Executors.newScheduledThreadPool(1);
    private DisruptorFactory disruptorFactory;
    private FutureDirector futureDirector;
    private Map<String, Disruptor> topicDisruptors;

    public EventMessageFirer(DisruptorFactory disruptorFactory, FutureDirector futureDirector) {
        this.disruptorFactory = disruptorFactory;
        this.futureDirector = futureDirector;
        this.topicDisruptors = new ConcurrentHashMap<String, Disruptor>();
    }

    @Override
    public void start() {
        Runnable task = new Runnable(){

            @Override
            public void run() {
                EventMessageFirer.this.stopDisruptor();
            }
        };
        this.scheduExecStatic.scheduleAtFixedRate(task, 3600L, 3600L, TimeUnit.SECONDS);
    }

    @Override
    public void stop() {
        if (this.topicDisruptors != null) {
            this.stopDisruptor();
            this.topicDisruptors.clear();
            this.topicDisruptors = null;
        }
        if (this.futureDirector != null) {
            this.futureDirector.stop();
            this.futureDirector = null;
        }
        this.disruptorFactory = null;
        this.scheduExecStatic.shutdownNow();
    }

    private void stopDisruptor() {
        HashMap<String, Disruptor> mydisruptors = new HashMap<String, Disruptor>(this.topicDisruptors);
        this.topicDisruptors.clear();
        for (Object key : mydisruptors.keySet()) {
            Disruptor disruptor = (Disruptor)mydisruptors.get(key);
            try {
                disruptor.halt();
            }
            catch (Exception e) {}
        }
    }

    public void fire(DomainMessage domainMessage, Send send, FutureListener futureListener) {
        EventResultFuture eventMessageFuture = new EventResultFuture(send.value(), futureListener, domainMessage);
        eventMessageFuture.setAsyn(send.asyn());
        domainMessage.setEventResultHandler(eventMessageFuture);
        this.futureDirector.fire(domainMessage);
    }

    private Disruptor getDisruptor(String topic) {
        Disruptor disruptor = this.topicDisruptors.get(topic);
        if (disruptor == null) {
            disruptor = this.disruptorFactory.createDisruptor(topic);
            if (disruptor == null) {
                Debug.logWarning("not create disruptor for " + topic, module);
                return null;
            }
            this.topicDisruptors.put(topic, disruptor);
        }
        return disruptor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void fire(DomainMessage domainMessage, Send send) {
        String topic = send.value();
        Disruptor disruptor = this.getDisruptor(topic);
        if (disruptor == null) {
            Debug.logWarning("not create disruptor for " + topic, module);
            return;
        }
        try {
            RingBuffer ringBuffer = disruptor.getRingBuffer();
            long sequence = ringBuffer.next();
            EventDisruptor eventDisruptor = (EventDisruptor)ringBuffer.get(sequence);
            if (eventDisruptor == null) {
                return;
            }
            eventDisruptor.setTopic(topic);
            eventDisruptor.setDomainMessage(domainMessage);
            ringBuffer.publish(sequence);
        }
        catch (Exception e) {
            Debug.logError("fire error: " + send.value() + " domainMessage:" + domainMessage.getEventSource(), module);
        }
    }
}

