/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.server;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.noear.folkmq.server.MqConsumerQueue;
import org.noear.folkmq.server.MqConsumerQueueImpl;
import org.noear.folkmq.server.MqMessageHolder;
import org.noear.folkmq.server.MqServer;
import org.noear.socketd.SocketD;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.StringEntity;
import org.noear.socketd.transport.core.listener.BuilderListener;
import org.noear.socketd.transport.server.Server;
import org.noear.socketd.transport.server.ServerConfigHandler;
import org.noear.socketd.utils.RunUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqServerImpl
extends BuilderListener
implements MqServer {
    private static final Logger log = LoggerFactory.getLogger(MqServerImpl.class);
    private Server server;
    private ServerConfigHandler serverConfigHandler;
    private Map<String, Set<String>> subscribeMap = new HashMap<String, Set<String>>();
    private Map<String, MqConsumerQueue> consumerMap = new HashMap<String, MqConsumerQueue>();
    private Map<String, String> accessMap = new HashMap<String, String>();

    public MqServerImpl() {
        this.on("mq.cmd.subscribe", (s, m) -> {
            if (m.isRequest() || m.isSubscribe()) {
                s.replyEnd(m, (Entity)new StringEntity(""));
            }
            String topic = m.meta("mq.topic");
            String consumer = m.meta("mq.consumer");
            this.onSubscribe(topic, consumer, (Session)s);
        });
        this.on("mq.cmd.publish", (s, m) -> {
            if (m.isRequest() || m.isSubscribe()) {
                s.replyEnd(m, (Entity)new StringEntity(""));
            }
            RunUtils.async(() -> {
                String topic = m.meta("mq.topic");
                long scheduled = Long.parseLong(m.metaOrDefault("mq.scheduled", "0"));
                this.distributeDo(topic, scheduled, (Message)m);
            });
        });
    }

    @Override
    public MqServer config(ServerConfigHandler configHandler) {
        this.serverConfigHandler = configHandler;
        return this;
    }

    @Override
    public MqServer addAccess(String accessKey, String accessSecretKey) {
        this.accessMap.put(accessKey, accessSecretKey);
        return this;
    }

    @Override
    public MqServer start(int port) throws Exception {
        this.server = SocketD.createServer((String)"sd:tcp");
        if (this.serverConfigHandler != null) {
            this.server.config(this.serverConfigHandler);
        }
        this.server.config(c -> c.port(port)).listen((Listener)this).start();
        return this;
    }

    @Override
    public void stop() {
        this.server.stop();
    }

    public void onOpen(Session session) throws IOException {
        super.onOpen(session);
        if (this.accessMap.size() > 0) {
            String accessKey = session.param("ak");
            String accessSecretKey = session.param("sk");
            if (accessKey == null || accessSecretKey == null) {
                session.close();
                return;
            }
            if (!accessSecretKey.equals(this.accessMap.get(accessKey))) {
                session.close();
                return;
            }
        }
        log.info("Channel session opened, session={}", (Object)session.sessionId());
    }

    public void onClose(Session session) {
        super.onClose(session);
        log.info("Channel session closed, session={}", (Object)session.sessionId());
        for (String consumer : session.attrMap().keySet()) {
            MqConsumerQueue messageQueue = this.consumerMap.get(consumer);
            if (messageQueue == null) continue;
            messageQueue.removeSession(session);
        }
    }

    public void onError(Session session, Throwable error) {
        super.onError(session, error);
        if (log.isWarnEnabled()) {
            log.warn("Channel error, session={}", (Object)session.sessionId(), (Object)error);
        }
    }

    private synchronized void onSubscribe(String topic, String consumer, Session session) {
        log.info("Channel subscribe topic={}, consumer={}, session={}", new Object[]{topic, consumer, session.sessionId()});
        session.attr(consumer, (Object)"1");
        Set<String> consumerSet = this.subscribeMap.get(topic);
        if (consumerSet == null) {
            consumerSet = new HashSet<String>();
            this.subscribeMap.put(topic, consumerSet);
        }
        consumerSet.add(consumer);
        MqConsumerQueue consumerQueue = this.consumerMap.get(consumer);
        if (consumerQueue == null) {
            consumerQueue = new MqConsumerQueueImpl(consumer);
            this.consumerMap.put(consumer, consumerQueue);
        }
        consumerQueue.addSession(session);
    }

    private void distributeDo(String topic, long scheduled, Message message) {
        Set<String> consumerSet = this.subscribeMap.get(topic);
        if (consumerSet != null) {
            for (String consumer : consumerSet) {
                MqConsumerQueue queue = this.consumerMap.get(consumer);
                if (queue == null) continue;
                MqMessageHolder messageHolder = new MqMessageHolder(message, scheduled);
                queue.push(messageHolder);
            }
        }
    }
}

