/*
 * Decompiled with CFR 0.152.
 */
package org.noear.luffy.event.message.controller;

import java.util.List;
import org.noear.luffy.dso.JtLock;
import org.noear.luffy.dso.JtMsg;
import org.noear.luffy.dso.LogLevel;
import org.noear.luffy.dso.LogUtil;
import org.noear.luffy.event.message.dso.AMessageDistributionModel;
import org.noear.luffy.event.message.dso.AMessageModel;
import org.noear.luffy.event.message.dso.DbMsgApi;
import org.noear.luffy.event.message.dso.DisttimeUtil;
import org.noear.luffy.event.message.dso.StateTag;
import org.noear.luffy.executor.ExecutorFactory;
import org.noear.luffy.model.AFileModel;
import org.noear.luffy.task.JtTaskBase;
import org.noear.luffy.utils.ExceptionUtils;
import org.noear.luffy.utils.TextUtils;
import org.noear.luffy.utils.Timespan;
import org.noear.solon.core.handle.Context;
import org.noear.solon.core.handle.ContextEmpty;
import org.noear.solon.core.handle.ContextUtil;
import org.noear.wood.ext.Act3;

public class MessageTask
extends JtTaskBase {
    private int rows = 50;
    private Act3<StateTag, AMessageDistributionModel, Boolean> distributeMessage_callback = (tag, dist, isOk) -> {
        ++tag.count;
        if (isOk.booleanValue()) {
            if (DbMsgApi.msgSetDistributionState(tag.msg.msg_id, dist, 2)) {
                ++tag.value;
            }
        } else {
            DbMsgApi.msgSetDistributionState(tag.msg.msg_id, dist, 1);
        }
        if (tag.count == tag.total) {
            if (tag.value == tag.total) {
                DbMsgApi.msgSetState(dist.msg_id, 2);
                if (tag.msg.dist_count >= 4) {
                    System.out.print("\u53d1\u9001\u77ed\u4fe1\u62a5\u8b66---\r\n");
                }
            } else {
                DbMsgApi.msgSetRepet(tag.msg, 0);
                if (tag.msg.dist_count >= 4) {
                    System.out.print("\u53d1\u9001\u77ed\u4fe1\u62a5\u8b66---\r\n");
                }
            }
        }
    };

    public MessageTask() {
        super("_message", 500);
    }

    public void exec() throws Exception {
        if (!this.node_current_can_run()) {
            return;
        }
        this._interval = this._interval_bak;
        int ntime = DisttimeUtil.currTime();
        List<Long> msgList = DbMsgApi.msgGetList(this.rows, ntime);
        for (Long msgID : msgList) {
            if (!JtLock.g.tryLock("luffy.event", this.getName() + "_" + msgID)) continue;
            AMessageModel msg = DbMsgApi.msgGet(msgID);
            this.distribute(msg);
        }
        this._interval_bak = msgList.size() == 0 ? (this._interval = 2000) : (this._interval = 500);
    }

    private void distribute(AMessageModel msg) {
        try {
            this.do_distribute(msg);
        }
        catch (Throwable ex) {
            DbMsgApi.msgSetRepet(msg, 0);
            LogUtil.log((String)this.getName(), (String)"distribute", (String)msg.topic, (String)(msg.msg_id + ""), (LogLevel)LogLevel.ERROR, (String)msg.content, (String)ExceptionUtils.getString((Throwable)ex));
        }
    }

    private void do_distribute(AMessageModel msg) throws Exception {
        if (msg == null) {
            return;
        }
        if (msg.state != 0) {
            return;
        }
        DbMsgApi.msgSetState(msg.msg_id, 1);
        List<AFileModel> subsList = DbMsgApi.msgGetSubs(msg.topic);
        if (subsList.size() == 0) {
            DbMsgApi.msgSetState(msg.msg_id, -2, 0);
            return;
        }
        for (AFileModel m : subsList) {
            DbMsgApi.msgAddDistribution(msg.msg_id, m);
        }
        List<AMessageDistributionModel> distList = DbMsgApi.msgGetDistributionList(msg.msg_id);
        if (distList.size() == 0) {
            DbMsgApi.msgSetState(msg.msg_id, 2);
            return;
        }
        StateTag state = new StateTag();
        state.msg = msg;
        state.total = distList.size();
        for (AMessageDistributionModel m : distList) {
            m._start_time = System.currentTimeMillis();
            this.poolExecute(() -> this.distributeMessage(state, msg, m, this.distributeMessage_callback));
        }
    }

    private void distributeMessage(StateTag tag, AMessageModel msg, AMessageDistributionModel dist, Act3<StateTag, AMessageDistributionModel, Boolean> callback) {
        try {
            AFileModel task = DbMsgApi.fileGet(dist.receive_url);
            if (dist.receive_way == 0) {
                this.do_distributeMessage(task, tag, msg, dist, callback);
            }
        }
        catch (Throwable ex) {
            this.distributeMessage_log_err(msg, dist, ex);
            callback.run((Object)tag, (Object)dist, (Object)false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void do_distributeMessage(AFileModel task, StateTag tag, AMessageModel msg, AMessageDistributionModel dist, Act3<StateTag, AMessageDistributionModel, Boolean> callback) throws Exception {
        try {
            Context ctx = ContextEmpty.create();
            ContextUtil.currentSet((Context)ctx);
            ctx.attrSet("topic", (Object)msg.topic);
            ctx.attrSet("content", (Object)msg.content);
            Object tmp = ExecutorFactory.execOnly((AFileModel)task, (Context)ctx);
            dist._duration = new Timespan(System.currentTimeMillis(), dist._start_time).seconds();
            if (tmp == null) {
                tmp = "null";
            }
            if ("OK".equals(tmp.toString())) {
                if (!TextUtils.isEmpty((String)msg.topic_source)) {
                    JtMsg.g.forward(msg.topic, (Object)msg.content, msg.topic_source);
                }
                this.distributeMessage_log(msg, dist, "OK");
                callback.run((Object)tag, (Object)dist, (Object)true);
            } else {
                this.distributeMessage_log(msg, dist, tmp.toString());
                callback.run((Object)tag, (Object)dist, (Object)false);
            }
        }
        finally {
            ContextUtil.currentRemove();
        }
    }

    private void distributeMessage_log(AMessageModel msg, AMessageDistributionModel dist, String note) {
        LogUtil.log((String)this.getName(), (String)"distributeMessage", (String)msg.topic, (String)(msg.msg_id + ""), (LogLevel)LogLevel.TRACE, (String)msg.content, (String)(dist.receive_url + ":" + note));
    }

    private void distributeMessage_log_err(AMessageModel msg, AMessageDistributionModel dist, Throwable err) {
        String note = ExceptionUtils.getString((Throwable)err);
        LogUtil.log((String)this.getName(), (String)"distributeMessage", (String)msg.topic, (String)(msg.msg_id + ""), (LogLevel)LogLevel.ERROR, (String)msg.content, (String)(dist.receive_url + ":" + note));
    }
}

