/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.server;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.noear.folkmq.server.MqMessageHolder;
import org.noear.folkmq.server.MqServer;
import org.noear.folkmq.server.MqUserQueue;
import org.noear.folkmq.server.MqUserQueueImpl;
import org.noear.socketd.SocketD;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.StringEntity;
import org.noear.socketd.transport.core.listener.BuilderListener;
import org.noear.socketd.transport.server.Server;
import org.noear.socketd.utils.NamedThreadFactory;

public class MqServerImpl
extends BuilderListener
implements MqServer {
    private Server server;
    private ExecutorService distributeExecutor;
    private Map<String, Set<String>> subscribeMap = new HashMap<String, Set<String>>();
    private Map<String, MqUserQueue> userMap = new HashMap<String, MqUserQueue>();
    private Map<String, String> accessMap = new HashMap<String, String>();

    @Override
    public MqServer addAccess(String accessKey, String accessSecretKey) {
        this.accessMap.put(accessKey, accessSecretKey);
        return this;
    }

    @Override
    public MqServer distributeExecutor(ExecutorService distributeExecutor) {
        if (distributeExecutor != null) {
            this.distributeExecutor = distributeExecutor;
        }
        return this;
    }

    private void initDistributeExecutor() {
        if (this.distributeExecutor == null) {
            int distributePoolSize = Runtime.getRuntime().availableProcessors() * 2;
            this.distributeExecutor = new ThreadPoolExecutor(distributePoolSize, distributePoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory("FolkMQ-distributeExecutor-"));
        }
    }

    @Override
    public MqServer stop() {
        this.server.stop();
        return this;
    }

    @Override
    public MqServer start(int port) throws Exception {
        this.initDistributeExecutor();
        this.server = SocketD.createServer((String)"sd:tcp").config(c -> c.port(port)).listen((Listener)this).start();
        this.on("mq.cmd.subscribe", (s, m) -> {
            if (m.isRequest() || m.isSubscribe()) {
                s.replyEnd(m, (Entity)new StringEntity(""));
            }
            String topic = m.meta("mq.topic");
            String user = m.meta("mq.user");
            this.onSubscribe(topic, user, (Session)s);
        });
        this.on("mq.cmd.publish", (s, m) -> {
            if (m.isRequest() || m.isSubscribe()) {
                s.replyEnd(m, (Entity)new StringEntity(""));
            }
            this.distributeExecutor.submit(() -> {
                String topic = m.meta("mq.topic");
                long scheduled = Long.parseLong(m.metaOrDefault("mq.scheduled", "0"));
                this.distribute(topic, scheduled, (Message)m);
            });
        });
        return this;
    }

    public void onOpen(Session session) throws IOException {
        super.onOpen(session);
        if (this.accessMap.size() > 0) {
            String accessKey = session.param("accessKey");
            String accessSecretKey = session.param("accessSecretKey");
            if (accessKey == null || accessSecretKey == null) {
                session.close();
                return;
            }
            if (!accessSecretKey.equals(this.accessMap.get(accessKey))) {
                session.close();
                return;
            }
        }
    }

    public void onClose(Session session) {
        super.onClose(session);
        for (String user : session.attrMap().keySet()) {
            MqUserQueue messageQueue = this.userMap.get(user);
            if (messageQueue == null) continue;
            messageQueue.removeSession(session);
        }
    }

    private synchronized void onSubscribe(String topic, String user, Session session) {
        session.attr(user, (Object)"1");
        Set<String> userSet = this.subscribeMap.get(topic);
        if (userSet == null) {
            userSet = new HashSet<String>();
            this.subscribeMap.put(topic, userSet);
        }
        userSet.add(user);
        if (!this.userMap.containsKey(user)) {
            MqUserQueueImpl messageQueue = new MqUserQueueImpl(user);
            messageQueue.addSession(session);
            this.userMap.put(user, messageQueue);
        }
    }

    private void distribute(String topic, long scheduled, Message message) {
        Set<String> userSet = this.subscribeMap.get(topic);
        if (userSet != null) {
            for (String user : userSet) {
                MqUserQueue queue = this.userMap.get(user);
                if (queue == null) continue;
                MqMessageHolder messageHolder = new MqMessageHolder(message, scheduled);
                queue.push(messageHolder);
            }
        }
    }
}

