/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.noear.folkmq.server.MqConsumerQueue;
import org.noear.folkmq.server.MqConsumerQueueBase;
import org.noear.folkmq.server.MqMessageHolder;
import org.noear.folkmq.server.MqNextTime;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqConsumerQueueImpl
extends MqConsumerQueueBase
implements MqConsumerQueue {
    private static final Logger log = LoggerFactory.getLogger(MqConsumerQueueImpl.class);
    private final String consumer;
    private final List<Session> userSessionSet;

    public MqConsumerQueueImpl(String consumer) {
        this.consumer = consumer;
        this.userSessionSet = new ArrayList<Session>();
    }

    @Override
    public String getConsumer() {
        return this.consumer;
    }

    @Override
    public void addSession(Session session) {
        this.userSessionSet.add(session);
    }

    @Override
    public void removeSession(Session session) {
        this.userSessionSet.remove(session);
    }

    @Override
    public synchronized void push(MqMessageHolder messageHolder) {
        this.distribute(messageHolder);
    }

    private void distribute(MqMessageHolder messageHolder) {
        if (this.userSessionSet.size() > 0) {
            if (!MqNextTime.chkNextTime(messageHolder)) {
                this.addDelayed(messageHolder);
            } else {
                try {
                    this.distributeDo(messageHolder, this.userSessionSet);
                }
                catch (Throwable e) {
                    this.addDelayed(messageHolder.delayed());
                }
            }
        } else {
            this.addDelayed(messageHolder.delayed());
            log.warn("No sessions!");
        }
    }

    private void distributeDo(MqMessageHolder messageHolder, List<Session> sessions) throws IOException {
        int idx = 0;
        if (sessions.size() > 1) {
            idx = new Random().nextInt(sessions.size());
        }
        Session s1 = sessions.get(idx);
        messageHolder.getContent().meta("mq.times", String.valueOf(messageHolder.getDistributeCount()));
        this.addDelayed(messageHolder, 300L);
        s1.sendAndSubscribe("mq.cmd.distribute", (Entity)messageHolder.getContent(), m -> {
            int ack = Integer.parseInt(m.metaOrDefault("mq.ack", "0"));
            if (ack == 0) {
                this.addDelayed(messageHolder.delayed());
            } else {
                this.clearDelayed(messageHolder);
            }
        });
    }
}

