/*
 * Decompiled with CFR 0.152.
 */
package org.zstacks.zbus.server;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.zbus.protocol.MessageMode;
import org.zstacks.zbus.server.AdminHandler;
import org.zstacks.zbus.server.ServerHelper;
import org.zstacks.zbus.server.TrackReport;
import org.zstacks.zbus.server.mq.MessageQueue;
import org.zstacks.zbus.server.mq.ReplyQueue;
import org.zstacks.zbus.server.mq.store.MessageStore;
import org.zstacks.zbus.server.mq.store.MessageStoreFactory;
import org.zstacks.znet.Helper;
import org.zstacks.znet.Message;
import org.zstacks.znet.MessageHandler;
import org.zstacks.znet.RemotingServer;
import org.zstacks.znet.nio.Dispatcher;
import org.zstacks.znet.nio.Session;

public class ZbusServer
extends RemotingServer {
    private static final Logger log = LoggerFactory.getLogger(ZbusServer.class);
    private final ConcurrentMap<String, MessageQueue> mqTable = new ConcurrentHashMap<String, MessageQueue>();
    private boolean verbose = true;
    private MessageStore messageStore;
    private String messageStoreType = "dummy";
    private String adminToken = "";
    private final AdminHandler adminHandler;
    private final TrackReport trackReport;
    private final ExecutorService executorService;
    private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    private long mqCleanInterval = 3000L;

    public ZbusServer(int serverPort, Dispatcher dispatcher) throws IOException {
        this("0.0.0.0", serverPort, dispatcher);
    }

    public ZbusServer(String serverHost, int serverPort, Dispatcher dispatcher) throws IOException {
        super(serverHost, serverPort, dispatcher);
        if (!dispatcher.isStarted()) {
            dispatcher.start();
        }
        this.executorService = dispatcher.executorService();
        this.serverName = "ZbusServer";
        this.trackReport = new TrackReport(this.mqTable, this.serverAddr);
        this.adminHandler = new AdminHandler(this.mqTable, this.executorService, this.serverAddr, this.trackReport);
        this.adminHandler.setAccessToken(this.adminToken);
        this.initHandlers();
    }

    private MessageQueue findMQ(Message msg, Session sess) throws IOException {
        String mqName = msg.getMq();
        if (mqName == null) {
            mqName = msg.getPath();
        }
        MessageQueue mq = (MessageQueue)this.mqTable.get(mqName);
        boolean ack = msg.isAck();
        if (mq == null) {
            if (ack) {
                ServerHelper.reply404(msg, sess);
            }
            return null;
        }
        if (!"".equals(mq.getAccessToken()) && !mq.getAccessToken().equals(msg.getToken())) {
            if (ack) {
                ServerHelper.reply403(msg, sess);
            }
            return null;
        }
        return mq;
    }

    private void initHandlers() {
        this.registerGlobalHandler(new MessageHandler(){

            public void handleMessage(Message msg, Session sess) throws IOException {
                String mqReply = msg.getMqReply();
                if (mqReply == null || mqReply.equals("")) {
                    msg.setMqReply(sess.id());
                }
                if (msg.getMsgId() == null) {
                    msg.setMsgId(UUID.randomUUID().toString());
                }
                msg.setHead("remote-addr", sess.getRemoteAddress());
                msg.setHead("broker", ZbusServer.this.serverAddr);
                if (!"heartbeat".equals(msg.getCommand())) {
                    if (ZbusServer.this.verbose) {
                        log.info("{}", (Object)msg);
                    } else {
                        log.debug("{}", (Object)msg);
                    }
                }
            }
        });
        this.registerHandler("produce", new MessageHandler(){

            public void handleMessage(Message msg, Session sess) throws IOException {
                MessageQueue mq = ZbusServer.this.findMQ(msg, sess);
                if (mq == null) {
                    return;
                }
                mq.produce(msg, sess);
            }
        });
        this.registerHandler("consume", new MessageHandler(){

            public void handleMessage(Message msg, Session sess) throws IOException {
                MessageQueue mq = ZbusServer.this.findMQ(msg, sess);
                if (mq == null) {
                    return;
                }
                mq.consume(msg, sess);
            }
        });
        this.registerHandler("request", new MessageHandler(){

            public void handleMessage(Message requestMsg, Session sess) throws IOException {
                MessageQueue requestMq = ZbusServer.this.findMQ(requestMsg, sess);
                if (requestMq == null) {
                    return;
                }
                String replyMqName = requestMsg.getMqReply();
                MessageQueue replyMq = (MessageQueue)ZbusServer.this.mqTable.get(replyMqName);
                if (replyMq == null) {
                    int mode = MessageMode.intValue(MessageMode.MQ, MessageMode.Temp);
                    replyMq = new ReplyQueue(ZbusServer.this.serverAddr, replyMqName, ZbusServer.this.executorService, mode);
                    replyMq.setCreator(sess.getRemoteAddress());
                    ZbusServer.this.mqTable.putIfAbsent(replyMqName, replyMq);
                }
                requestMsg.setAck(false);
                Message msgConsume = Message.copyWithoutBody((Message)requestMsg);
                requestMq.produce(requestMsg, sess);
                replyMq.consume(msgConsume, sess);
            }
        });
        this.registerHandler("admin", this.adminHandler);
    }

    public void setAdminToken(String adminToken) {
        this.adminToken = adminToken;
    }

    public void start() throws IOException {
        super.start();
        this.messageStore = MessageStoreFactory.getMessageStore(this.serverAddr, this.messageStoreType);
        this.adminHandler.setMessageStore(this.messageStore);
        log.info("message store loading ....");
        this.mqTable.clear();
        try {
            ConcurrentMap<String, MessageQueue> mqs = this.messageStore.loadMqTable();
            Iterator iter = mqs.entrySet().iterator();
            while (iter.hasNext()) {
                MessageQueue mq = (MessageQueue)iter.next().getValue();
                mq.setExecutor(this.executorService);
            }
            this.mqTable.putAll(mqs);
            log.info("message store loaded");
        }
        catch (Exception e) {
            log.info("message store loading error: {}", (Object)e.getMessage(), (Object)e);
        }
        this.scheduledExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                for (Map.Entry e : ZbusServer.this.mqTable.entrySet()) {
                    MessageQueue mq = (MessageQueue)e.getValue();
                    mq.cleanSession();
                }
            }
        }, 1000L, this.mqCleanInterval, TimeUnit.MILLISECONDS);
    }

    public void close() throws IOException {
        this.scheduledExecutor.shutdown();
        this.trackReport.close();
        super.close();
    }

    public void startTrackReport(String trackServerAddr) {
        try {
            this.trackReport.startTrackReport(trackServerAddr, this.dispatcher);
        }
        catch (IOException e) {
            log.error(e.getMessage(), (Throwable)e);
        }
    }

    public void setMessageStoreType(String messageStoreType) {
        this.messageStoreType = messageStoreType;
    }

    public void onException(Throwable e, Session sess) throws IOException {
        if (!(e instanceof IOException)) {
            super.onException(e, sess);
        }
        this.cleanMQ(sess);
    }

    public void onSessionDestroyed(Session sess) throws IOException {
        this.cleanMQ(sess);
    }

    public String findHandlerKey(Message msg) {
        String cmd = msg.getCommand();
        if (cmd == null) {
            cmd = msg.getPath();
        }
        if (cmd == null || "".equals(cmd.trim())) {
            cmd = "admin";
        }
        return cmd;
    }

    private void cleanMQ(Session sess) {
        if (this.mqTable == null) {
            return;
        }
        String creator = sess.getRemoteAddress();
        Iterator iter = this.mqTable.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry e = iter.next();
            MessageQueue mq = (MessageQueue)e.getValue();
            if (!MessageMode.isEnabled(mq.getMode(), MessageMode.Temp) || !mq.getCreator().equals(creator)) continue;
            iter.remove();
        }
    }

    public String getServerAddress() {
        return this.serverAddr;
    }

    public void setVerbose(boolean verbose) {
        this.verbose = verbose;
    }

    public static void main(String[] args) throws Exception {
        ZbusServerConfig config = new ZbusServerConfig();
        config.serverPort = Helper.option((String[])args, (String)"-p", (int)15555);
        config.adminToken = Helper.option((String[])args, (String)"-admin", (String)"");
        config.trackServerAddr = Helper.option((String[])args, (String)"-track", (String)"127.0.0.1:16666;127.0.0.1:16667");
        config.storeType = Helper.option((String[])args, (String)"-store", (String)"dummy");
        config.selectorCount = Helper.option((String[])args, (String)"-selector", (int)1);
        config.executorCount = Helper.option((String[])args, (String)"-executor", (int)16);
        config.verbose = Helper.option((String[])args, (String)"-verbose", (boolean)true);
        Dispatcher dispatcher = new Dispatcher().selectorCount(config.selectorCount).executorCount(config.executorCount);
        ZbusServer zbus = new ZbusServer(config.serverPort, dispatcher);
        zbus.setAdminToken(config.adminToken);
        zbus.setMessageStoreType(config.storeType);
        zbus.setVerbose(config.verbose);
        if (config.trackServerAddr != null && !config.trackServerAddr.equals("")) {
            zbus.startTrackReport(config.trackServerAddr);
        }
        zbus.start();
    }

    public static class ZbusServerConfig {
        public int serverPort = 15555;
        public String adminToken = "";
        public String trackServerAddr;
        public String storeType = "dummy";
        public int selectorCount = 1;
        public int executorCount = 4;
        public boolean verbose = true;
    }
}

