/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import org.noear.folkmq.server.MqMessageHolder;
import org.noear.folkmq.server.MqMessageQueue;
import org.noear.folkmq.server.MqNextTime;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.utils.RunUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqMessageQueueImpl
implements MqMessageQueue {
    private static final Logger log = LoggerFactory.getLogger(MqMessageQueueImpl.class);
    private Queue<MqMessageHolder> queue = new LinkedList<MqMessageHolder>();
    private final String identity;
    private final List<Session> subscriberSet;
    private CompletableFuture<?> distributeFuture;
    private Object distributeFutureLock = "";

    public MqMessageQueueImpl(String identity) {
        this.identity = identity;
        this.subscriberSet = new ArrayList<Session>();
    }

    @Override
    public void addSubscriber(Session session) {
        this.subscriberSet.add(session);
    }

    @Override
    public void removeSubscriber(Session session) {
        this.subscriberSet.remove(session);
    }

    @Override
    public String getIdentity() {
        return this.identity;
    }

    @Override
    public synchronized void push(MqMessageHolder messageHolder) {
        this.queue.add(messageHolder);
        this.distributeFutureInit();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void distributeFutureInit() {
        Object object = this.distributeFutureLock;
        synchronized (object) {
            if (this.distributeFuture == null) {
                this.distributeFuture = RunUtils.async(() -> {
                    this.distribute();
                    this.distributeFutureAsNull();
                });
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void distributeFutureAsNull() {
        Object object = this.distributeFutureLock;
        synchronized (object) {
            this.distributeFuture = null;
        }
    }

    private void addDelayed(MqMessageHolder messageHolder) {
        this.addDelayed(messageHolder, messageHolder.getNextTime() - System.currentTimeMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addDelayed(MqMessageHolder messageHolder, long millisDelay) {
        MqMessageHolder mqMessageHolder = messageHolder;
        synchronized (mqMessageHolder) {
            if (messageHolder.deferredFuture != null) {
                messageHolder.deferredFuture.cancel(true);
            }
            messageHolder.deferredFuture = RunUtils.delay(() -> this.push(messageHolder), (long)millisDelay);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clearDelayed(MqMessageHolder messageHolder) {
        MqMessageHolder mqMessageHolder = messageHolder;
        synchronized (mqMessageHolder) {
            if (messageHolder.deferredFuture != null) {
                messageHolder.deferredFuture.cancel(true);
                messageHolder.deferredFuture = null;
            }
        }
    }

    private void distribute() {
        if (this.subscriberSet.size() > 0) {
            MqMessageHolder messageHolder;
            while ((messageHolder = this.queue.poll()) != null) {
                if (!MqNextTime.allowDistribute(messageHolder)) {
                    this.addDelayed(messageHolder);
                    continue;
                }
                try {
                    this.distributeDo(messageHolder, this.subscriberSet);
                }
                catch (Throwable e) {
                    this.addDelayed(messageHolder.deferred());
                }
            }
        } else {
            log.warn("No sessions!");
        }
    }

    private void distributeDo(MqMessageHolder messageHolder, List<Session> sessions) throws IOException {
        int idx = 0;
        if (sessions.size() > 1) {
            idx = new Random().nextInt(sessions.size());
        }
        Session s1 = sessions.get(idx);
        messageHolder.getContent().meta("mq.times", String.valueOf(messageHolder.getTimes()));
        this.addDelayed(messageHolder, 300L);
        s1.sendAndSubscribe("mq.cmd.distribute", (Entity)messageHolder.getContent(), m -> {
            int ack = Integer.parseInt(m.metaOrDefault("mq.ack", "0"));
            if (ack == 0) {
                this.addDelayed(messageHolder.deferred());
            } else {
                this.clearDelayed(messageHolder);
            }
        });
    }
}

