/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.server;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import org.noear.folkmq.server.MqMessageHolder;
import org.noear.folkmq.server.MqNextTime;
import org.noear.folkmq.server.MqTopicConsumerQueue;
import org.noear.folkmq.server.MqTopicConsumerQueueBase;
import org.noear.folkmq.server.MqWatcher;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqTopicConsumerQueueDefault
extends MqTopicConsumerQueueBase
implements MqTopicConsumerQueue {
    private static final Logger log = LoggerFactory.getLogger(MqTopicConsumerQueueDefault.class);
    private final String topic;
    private final String consumer;
    private final MqWatcher watcher;
    private final Map<String, MqMessageHolder> messageMap;
    private final DelayQueue<MqMessageHolder> messageQueue;
    private final Thread messageQueueThread;
    private long queueTakeRef = 0L;

    public MqTopicConsumerQueueDefault(MqWatcher watcher, String topic, String consumer) {
        this.watcher = watcher;
        this.topic = topic;
        this.consumer = consumer;
        this.messageMap = new ConcurrentHashMap<String, MqMessageHolder>();
        this.messageQueue = new DelayQueue();
        this.messageQueueThread = new Thread(this::queueTake);
        this.messageQueueThread.start();
    }

    private void queueTake() {
        while (!this.messageQueueThread.isInterrupted()) {
            try {
                MqMessageHolder messageHolder = (MqMessageHolder)this.messageQueue.poll();
                if (messageHolder != null) {
                    this.queueTakeRef = 0L;
                    this.messageCounterSub(messageHolder);
                    this.distribute(messageHolder);
                    continue;
                }
                if (this.queueTakeRef++ > 1000L) {
                    if (log.isDebugEnabled()) {
                        log.debug("MqConsumerQueue queueTake as null *1000, queue={}#{}", (Object)this.topic, (Object)this.consumer);
                    }
                    this.queueTakeRef = 0L;
                }
                Thread.sleep(100L);
            }
            catch (Throwable e) {
                if (!log.isWarnEnabled()) continue;
                log.warn("MqConsumerQueue queueTake error, queue={}#{}", new Object[]{this.topic, this.consumer, e});
            }
        }
        if (log.isWarnEnabled()) {
            log.warn("MqConsumerQueue queueTake stoped!");
        }
    }

    @Override
    public String getTopic() {
        return this.topic;
    }

    @Override
    public String getConsumer() {
        return this.consumer;
    }

    public boolean isAlive() {
        return this.messageQueueThread.isAlive();
    }

    public Thread.State state() {
        return this.messageQueueThread.getState();
    }

    public Map<String, MqMessageHolder> getMessageMap() {
        return Collections.unmodifiableMap(this.messageMap);
    }

    @Override
    public void add(MqMessageHolder messageHolder) {
        this.messageMap.put(messageHolder.getTid(), messageHolder);
        this.messageQueue.add(messageHolder);
        this.messageCounterAdd(messageHolder);
    }

    private void internalAdd(MqMessageHolder mh) {
        this.messageQueue.add(mh);
        this.messageCounterAdd(mh);
    }

    private void internalRemove(MqMessageHolder mh) {
        if (this.messageQueue.remove(mh)) {
            this.messageCounterSub(mh);
        }
    }

    @Override
    public int messageTotal() {
        return this.messageQueue.size();
    }

    @Override
    public int messageTotal2() {
        return this.messageMap.size();
    }

    protected void distribute(MqMessageHolder messageHolder) {
        if (messageHolder.isDone()) {
            this.messageMap.remove(messageHolder.getTid());
            return;
        }
        if (this.sessionCount() > 0) {
            try {
                this.distributeDo(messageHolder);
            }
            catch (Throwable e) {
                this.internalRemove(messageHolder);
                this.internalAdd(messageHolder.delayed());
                if (log.isWarnEnabled()) {
                    log.warn("MqConsumerQueue distribute error, tid={}", (Object)messageHolder.getTid(), (Object)e);
                }
            }
        } else {
            this.internalAdd(messageHolder.delayed());
            if (log.isDebugEnabled()) {
                log.debug("MqConsumerQueue distribute: @{} no sessions, times={}, tid={}", new Object[]{this.consumer, messageHolder.getDistributeCount(), messageHolder.getTid()});
            }
        }
    }

    private void distributeDo(MqMessageHolder messageHolder) throws IOException {
        Session s1 = this.getSession();
        this.watcher.onDistribute(this.topic, this.consumer, messageHolder);
        if (messageHolder.getQos() > 0) {
            messageHolder.setDistributeTime(System.currentTimeMillis() + MqNextTime.getMaxDelayMillis());
            this.internalAdd(messageHolder);
            s1.sendAndRequest("mq.event.distribute", (Entity)messageHolder.getContent(), m -> {
                int ack = Integer.parseInt(m.metaOrDefault("mq.ack", "0"));
                this.acknowledgeDo(messageHolder, ack);
            });
        } else {
            s1.send("mq.event.distribute", (Entity)messageHolder.getContent());
            this.watcher.onAcknowledge(this.topic, this.consumer, messageHolder, true);
            this.messageMap.remove(messageHolder.getTid());
            messageHolder.setDone(true);
        }
    }

    private void acknowledgeDo(MqMessageHolder messageHolder, int ack) {
        this.watcher.onAcknowledge(this.topic, this.consumer, messageHolder, ack > 0);
        if (ack > 0) {
            this.messageMap.remove(messageHolder.getTid());
            this.internalRemove(messageHolder);
            messageHolder.setDone(true);
        } else {
            this.internalRemove(messageHolder);
            this.internalAdd(messageHolder.delayed());
        }
    }

    @Override
    public void close() {
        if (this.messageQueueThread != null) {
            this.messageQueueThread.interrupt();
        }
        this.messageQueue.clear();
        this.messageMap.clear();
        super.close();
    }
}

