/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.server;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.noear.folkmq.server.MqMessageHolder;
import org.noear.folkmq.server.MqMessageQueue;
import org.noear.folkmq.server.MqMessageQueueImpl;
import org.noear.folkmq.server.MqServer;
import org.noear.socketd.SocketD;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.StringEntity;
import org.noear.socketd.transport.core.listener.BuilderListener;
import org.noear.socketd.transport.server.Server;
import org.noear.socketd.utils.RunUtils;

public class MqServerImpl
extends BuilderListener
implements MqServer {
    private Server server;
    private Map<String, Set<String>> subscribeMap = new HashMap<String, Set<String>>();
    private Map<String, MqMessageQueue> identityMap = new HashMap<String, MqMessageQueue>();
    private Map<String, String> accessMap = new HashMap<String, String>();

    @Override
    public MqServer addAccess(String accessKey, String accessSecretKey) {
        this.accessMap.put(accessKey, accessSecretKey);
        return this;
    }

    @Override
    public MqServer stop() {
        this.server.stop();
        return this;
    }

    @Override
    public MqServer start(int port) throws Exception {
        this.server = SocketD.createServer((String)"sd:tcp").config(c -> c.port(port)).listen((Listener)this).start();
        this.on("mq.cmd.subscribe", (s, m) -> {
            if (m.isRequest() || m.isSubscribe()) {
                s.replyEnd(m, (Entity)new StringEntity(""));
            }
            String topic = m.meta("mq.topic");
            String identity = m.meta("mq.identity");
            this.onSubscribe(topic, identity, (Session)s);
        });
        this.on("mq.cmd.publish", (s, m) -> {
            if (m.isRequest() || m.isSubscribe()) {
                RunUtils.asyncAndTry(() -> s.replyEnd(m, (Entity)new StringEntity("")));
            }
            String topic = m.meta("mq.topic");
            this.onPublish(topic, (Message)m);
        });
        return this;
    }

    public void onOpen(Session session) throws IOException {
        super.onOpen(session);
        if (this.accessMap.size() > 0) {
            String accessKey = session.param("accessKey");
            String accessSecretKey = session.param("accessSecretKey");
            if (accessKey == null || accessSecretKey == null) {
                session.close();
                return;
            }
            if (!accessSecretKey.equals(this.accessMap.get(accessKey))) {
                session.close();
                return;
            }
        }
    }

    public void onClose(Session session) {
        super.onClose(session);
        for (String identity : session.attrMap().keySet()) {
            MqMessageQueue messageQueue = this.identityMap.get(identity);
            if (messageQueue == null) continue;
            messageQueue.removeSubscriber(session);
        }
    }

    private synchronized void onSubscribe(String topic, String identity, Session session) {
        session.attr(identity, (Object)"1");
        Set<String> identitySet = this.subscribeMap.get(topic);
        if (identitySet == null) {
            identitySet = new HashSet<String>();
            this.subscribeMap.put(topic, identitySet);
        }
        identitySet.add(identity);
        if (!this.identityMap.containsKey(identity)) {
            MqMessageQueueImpl messageQueue = new MqMessageQueueImpl(identity);
            messageQueue.addSubscriber(session);
            this.identityMap.put(identity, messageQueue);
        }
    }

    private void onPublish(String topic, Message message) throws IOException {
        Set<String> identitySet = this.subscribeMap.get(topic);
        if (identitySet != null) {
            for (String identity : identitySet) {
                MqMessageQueue queue = this.identityMap.get(identity);
                if (queue == null) continue;
                MqMessageHolder messageHolder = new MqMessageHolder(message);
                queue.push(messageHolder);
            }
        }
    }
}

