/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.noear.folkmq.server.MqMessageHolder;
import org.noear.folkmq.server.MqServerDefault;
import org.noear.folkmq.server.MqServiceInternal;
import org.noear.folkmq.server.MqTopicConsumerQueue;
import org.noear.folkmq.server.MqTopicConsumerQueueDefault;
import org.noear.folkmq.server.MqWatcher;
import org.noear.folkmq.server.MqWatcherDefault;
import org.noear.folkmq.server.OnStart;
import org.noear.socketd.exception.SocketdAlarmException;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.StringEntity;
import org.noear.socketd.transport.core.listener.EventListener;
import org.noear.socketd.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqServiceListener
extends EventListener
implements MqServiceInternal {
    private static final Logger log = LoggerFactory.getLogger(MqServerDefault.class);
    private Object SUBSCRIBE_LOCK = new Object();
    private Map<String, String> serverAccessMap = new ConcurrentHashMap<String, String>();
    private MqWatcher watcher;
    private Map<String, Set<String>> subscribeMap = new ConcurrentHashMap<String, Set<String>>();
    private Map<String, MqTopicConsumerQueue> topicConsumerMap = new ConcurrentHashMap<String, MqTopicConsumerQueue>();
    private boolean brokerMode;

    public MqServiceListener(boolean brokerMode) {
        this.brokerMode = brokerMode;
        this.watcher = new MqWatcherDefault();
        this.watcher.init(this);
        this.on("mq.event.subscribe", (s, m) -> {
            String topic = m.meta("mq.topic");
            String consumer = m.meta("mq.consumer");
            this.watcher.onSubscribe(topic, consumer, (Session)s);
            this.subscribeDo(topic, consumer, (Session)s);
            if (m.isRequest() || m.isSubscribe()) {
                s.replyEnd(m, (Entity)new StringEntity("").meta("mq.confirm", "1"));
            }
        });
        this.on("mq.event.unsubscribe", (s, m) -> {
            String topic = m.meta("mq.topic");
            String consumer = m.meta("mq.consumer");
            this.watcher.onUnSubscribe(topic, consumer, (Session)s);
            this.unsubscribeDo(topic, consumer, (Session)s);
            if (m.isRequest() || m.isSubscribe()) {
                s.replyEnd(m, (Entity)new StringEntity("").meta("mq.confirm", "1"));
            }
        });
        this.on("mq.event.publish", (s, m) -> {
            this.watcher.onPublish((Message)m);
            this.exchangeDo((Message)m);
            if (m.isRequest() || m.isSubscribe()) {
                s.replyEnd(m, (Entity)new StringEntity("").meta("mq.confirm", "1"));
            }
        });
        this.on("mq.event.save", (s, m) -> {
            this.save();
            if (m.isRequest() || m.isSubscribe()) {
                s.replyEnd(m, (Entity)new StringEntity("").meta("mq.confirm", "1"));
            }
        });
    }

    public MqServiceListener watcher(MqWatcher watcher) {
        if (watcher != null) {
            this.watcher = watcher;
            this.watcher.init(this);
        }
        return this;
    }

    public MqServiceListener addAccess(String accessKey, String accessSecretKey) {
        this.serverAccessMap.put(accessKey, accessSecretKey);
        return this;
    }

    public MqServiceListener addAccessAll(Map<String, String> accessMap) {
        if (accessMap != null) {
            this.serverAccessMap.putAll(accessMap);
        }
        return this;
    }

    public void start(OnStart onStart) throws Exception {
        this.watcher.onStartBefore();
        onStart.run();
        this.watcher.onStartAfter();
    }

    @Override
    public void save() {
        this.watcher.onSave();
    }

    public void stop(Runnable onStop) {
        this.watcher.onStopBefore();
        onStop.run();
        this.watcher.onStopAfter();
        ArrayList<MqTopicConsumerQueue> queueList = new ArrayList<MqTopicConsumerQueue>(this.topicConsumerMap.values());
        for (MqTopicConsumerQueue queue : queueList) {
            queue.close();
        }
    }

    public void onOpen(Session session) throws IOException {
        super.onOpen(session);
        if (this.brokerMode) {
            return;
        }
        if (this.serverAccessMap.size() > 0) {
            String accessKey = session.param("ak");
            String accessSecretKey = session.param("sk");
            if (accessKey == null || accessSecretKey == null) {
                session.close();
                return;
            }
            if (!accessSecretKey.equals(this.serverAccessMap.get(accessKey))) {
                session.close();
                return;
            }
        }
        log.info("Server channel opened, sessionId={}", (Object)session.sessionId());
    }

    public void onClose(Session session) {
        super.onClose(session);
        if (this.brokerMode) {
            return;
        }
        log.info("Server channel closed, sessionId={}", (Object)session.sessionId());
        ArrayList topicConsumerList = new ArrayList(session.attrMap().keySet());
        for (String topicConsumer : topicConsumerList) {
            MqTopicConsumerQueue topicConsumerQueue = this.topicConsumerMap.get(topicConsumer);
            if (topicConsumerQueue == null) continue;
            topicConsumerQueue.removeSession(session);
        }
    }

    public void onError(Session session, Throwable error) {
        super.onError(session, error);
        if (log.isWarnEnabled()) {
            if (error instanceof SocketdAlarmException) {
                SocketdAlarmException alarmException = (SocketdAlarmException)error;
                log.warn("Server channel error, sessionId={}, from={}", new Object[]{session.sessionId(), alarmException.getFrom(), error});
            } else {
                log.warn("Server channel error, sessionId={}", (Object)session.sessionId(), (Object)error);
            }
        }
    }

    @Override
    public Map<String, Set<String>> getSubscribeMap() {
        return Collections.unmodifiableMap(this.subscribeMap);
    }

    @Override
    public Map<String, MqTopicConsumerQueue> getTopicConsumerMap() {
        return Collections.unmodifiableMap(this.topicConsumerMap);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribeDo(String topic, String consumer, Session session) {
        String topicConsumer = topic + "#" + consumer;
        Object object = this.SUBSCRIBE_LOCK;
        synchronized (object) {
            Set topicConsumerSet = this.subscribeMap.computeIfAbsent(topic, n -> Collections.newSetFromMap(new ConcurrentHashMap()));
            topicConsumerSet.add(topicConsumer);
            MqTopicConsumerQueue topicConsumerQueue = this.topicConsumerMap.get(topicConsumer);
            if (topicConsumerQueue == null) {
                topicConsumerQueue = new MqTopicConsumerQueueDefault(this.watcher, topic, consumer);
                this.topicConsumerMap.put(topicConsumer, topicConsumerQueue);
            }
            if (session != null) {
                log.info("Server channel subscribe topic={}, consumer={}, sessionId={}", new Object[]{topic, consumer, session.sessionId()});
                session.attr(topicConsumer, (Object)"1");
                topicConsumerQueue.addSession(session);
            }
        }
    }

    @Override
    public void unsubscribeDo(String topic, String consumer, Session session) {
        if (session == null) {
            return;
        }
        log.info("Server channel unsubscribe topic={}, consumer={}, sessionId={}", new Object[]{topic, consumer, session.sessionId()});
        String topicConsumer = topic + "#" + consumer;
        MqTopicConsumerQueue topicConsumerQueue = this.topicConsumerMap.get(topicConsumer);
        session.attrMap().remove(topicConsumer);
        if (topicConsumerQueue != null) {
            topicConsumerQueue.removeSession(session);
        }
    }

    @Override
    public void exchangeDo(Message message) {
        Set<String> topicConsumerSet;
        String tid = message.meta("mq.tid");
        if (Utils.isEmpty((String)tid)) {
            log.warn("The tid cannot be null, sid={}", (Object)message.sid());
            return;
        }
        String topic = message.meta("mq.topic");
        int qos = "0".equals(message.meta("mq.qos")) ? 0 : 1;
        int times = Integer.parseInt(message.metaOrDefault("mq.times", "0"));
        long scheduled = 0L;
        String scheduledStr = message.meta("mq.scheduled");
        if (Utils.isNotEmpty((String)scheduledStr)) {
            scheduled = Long.parseLong(scheduledStr);
        }
        if ((topicConsumerSet = this.subscribeMap.get(topic)) != null) {
            ArrayList<String> topicConsumerList = new ArrayList<String>(topicConsumerSet);
            for (String topicConsumer : topicConsumerList) {
                this.exchangeDo(topicConsumer, message, tid, qos, times, scheduled);
            }
        }
    }

    @Override
    public void exchangeDo(String topicConsumer, Message message, String tid, int qos, int times, long scheduled) {
        MqTopicConsumerQueue topicConsumerQueue = this.topicConsumerMap.get(topicConsumer);
        if (topicConsumerQueue != null) {
            MqMessageHolder messageHolder = new MqMessageHolder(topicConsumerQueue.getConsumer(), message, tid, qos, times, scheduled);
            topicConsumerQueue.add(messageHolder);
        }
    }
}

