/*
 * Decompiled with CFR 0.152.
 */
package org.zstacks.zbus.server.mq;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.zbus.protocol.ConsumerInfo;
import org.zstacks.zbus.server.ServerHelper;
import org.zstacks.zbus.server.mq.MessageQueue;
import org.zstacks.zbus.server.mq.PullSession;
import org.zstacks.znet.Message;
import org.zstacks.znet.nio.Session;

public class RequestQueue
extends MessageQueue {
    private static final long serialVersionUID = -7640938066598234399L;
    private static final Logger log = LoggerFactory.getLogger(RequestQueue.class);
    protected final BlockingQueue<Message> msgQ = new LinkedBlockingQueue<Message>();
    transient BlockingQueue<PullSession> sessQ = new LinkedBlockingQueue<PullSession>();

    public RequestQueue(String broker, String name, ExecutorService executor, int mode) {
        super(broker, name, executor, mode);
    }

    void enqueue(final Message msg) {
        this.msgQ.offer(msg);
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    if (RequestQueue.this.messageStore != null) {
                        RequestQueue.this.messageStore.saveMessage(msg);
                    }
                }
                catch (Exception e) {
                    log.error(e.getMessage(), (Throwable)e);
                }
            }
        });
    }

    Message dequeue() {
        final Message msg = (Message)this.msgQ.poll();
        if (msg != null) {
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        if (RequestQueue.this.messageStore != null) {
                            RequestQueue.this.messageStore.removeMessage(msg);
                        }
                    }
                    catch (Exception e) {
                        log.error(e.getMessage(), (Throwable)e);
                    }
                }
            });
        }
        return msg;
    }

    @Override
    public void produce(Message msg, Session sess) throws IOException {
        String msgId = msg.getMsgId();
        if (msg.isAck()) {
            ServerHelper.reply200(msgId, sess);
        }
        this.enqueue(msg);
        this.dispatch();
    }

    @Override
    public void consume(Message msg, Session sess) throws IOException {
        for (PullSession pull : this.sessQ) {
            if (pull.getSession() != sess) continue;
            pull.setPullMsg(msg);
            this.dispatch();
            return;
        }
        PullSession pull = new PullSession(sess, msg);
        this.sessQ.offer(pull);
        this.dispatch();
    }

    @Override
    public void cleanSession() {
        Iterator iter = this.sessQ.iterator();
        while (iter.hasNext()) {
            PullSession ps = (PullSession)iter.next();
            if (ps.session.isActive()) continue;
            iter.remove();
        }
    }

    @Override
    void doDispatch() throws IOException {
        while (this.msgQ.peek() != null && this.sessQ.peek() != null) {
            Message msg;
            PullSession pull = (PullSession)this.sessQ.poll();
            if (pull == null || pull.window.get() == 0 || !pull.getSession().isActive() || (msg = this.dequeue()) == null) continue;
            try {
                Message pullMsg = pull.getPullMsg();
                Message writeMsg = Message.copyWithoutBody((Message)msg);
                this.prepareMessageStatus(writeMsg);
                writeMsg.setMsgIdRaw(msg.getMsgId());
                writeMsg.setMsgId(pullMsg.getMsgId());
                pull.getSession().write((Object)writeMsg);
                if (pull.window.get() > 0) {
                    pull.window.decrementAndGet();
                }
            }
            catch (IOException ex) {
                log.error(ex.getMessage(), (Throwable)ex);
                this.enqueue(msg);
            }
            if (pull.window.get() != -1 && pull.window.get() <= 0) continue;
            this.sessQ.offer(pull);
        }
    }

    public void loadMessageList(List<Message> msgs) {
        this.msgQ.clear();
        this.msgQ.addAll(msgs);
    }

    @Override
    public List<ConsumerInfo> getConsumerInfoList() {
        ArrayList<ConsumerInfo> res = new ArrayList<ConsumerInfo>();
        for (PullSession value : this.sessQ) {
            Session sess = value.getSession();
            ConsumerInfo info = new ConsumerInfo();
            info.setStatus(sess.getStatus().toString());
            info.setRemoteAddr(sess.getRemoteAddress());
            res.add(info);
        }
        return res;
    }

    @Override
    public int getMessageQueueSize() {
        return this.msgQ.size();
    }
}

