/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.noear.folkmq.server.MqMessageHolder;
import org.noear.folkmq.server.MqNextTime;
import org.noear.folkmq.server.MqUserQueue;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.utils.RunUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqUserQueueImpl
implements MqUserQueue {
    private static final Logger log = LoggerFactory.getLogger(MqUserQueueImpl.class);
    private final String user;
    private final List<Session> userSessionSet;

    public MqUserQueueImpl(String user) {
        this.user = user;
        this.userSessionSet = new ArrayList<Session>();
    }

    @Override
    public void addSession(Session session) {
        this.userSessionSet.add(session);
    }

    @Override
    public void removeSession(Session session) {
        this.userSessionSet.remove(session);
    }

    @Override
    public String getUser() {
        return this.user;
    }

    @Override
    public synchronized void push(MqMessageHolder messageHolder) {
        this.distribute(messageHolder);
    }

    private void addDelayed(MqMessageHolder messageHolder) {
        this.addDelayed(messageHolder, messageHolder.getNextTime() - System.currentTimeMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addDelayed(MqMessageHolder messageHolder, long millisDelay) {
        MqMessageHolder mqMessageHolder = messageHolder;
        synchronized (mqMessageHolder) {
            if (messageHolder.deferredFuture != null) {
                messageHolder.deferredFuture.cancel(false);
            }
            messageHolder.deferredFuture = RunUtils.delay(() -> this.push(messageHolder), (long)millisDelay);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clearDelayed(MqMessageHolder messageHolder) {
        MqMessageHolder mqMessageHolder = messageHolder;
        synchronized (mqMessageHolder) {
            if (messageHolder.deferredFuture != null) {
                messageHolder.deferredFuture.cancel(false);
            }
        }
    }

    private void distribute(MqMessageHolder messageHolder) {
        if (this.userSessionSet.size() > 0) {
            if (!MqNextTime.allowDistribute(messageHolder)) {
                this.addDelayed(messageHolder);
            } else {
                try {
                    this.distributeDo(messageHolder, this.userSessionSet);
                }
                catch (Throwable e) {
                    this.addDelayed(messageHolder.deferred());
                }
            }
        } else {
            log.warn("No sessions!");
        }
    }

    private void distributeDo(MqMessageHolder messageHolder, List<Session> sessions) throws IOException {
        int idx = 0;
        if (sessions.size() > 1) {
            idx = new Random().nextInt(sessions.size());
        }
        Session s1 = sessions.get(idx);
        messageHolder.getContent().meta("mq.times", String.valueOf(messageHolder.getTimes()));
        this.addDelayed(messageHolder, 300L);
        s1.sendAndSubscribe("mq.cmd.distribute", (Entity)messageHolder.getContent(), m -> {
            int ack = Integer.parseInt(m.metaOrDefault("mq.ack", "0"));
            if (ack == 0) {
                this.addDelayed(messageHolder.deferred());
            } else {
                this.clearDelayed(messageHolder);
            }
        });
    }
}

