/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.cloud.extend.folkmq.service;

import java.io.IOException;
import org.noear.folkmq.FolkMQ;
import org.noear.folkmq.client.MqClient;
import org.noear.folkmq.client.MqConsumeHandler;
import org.noear.folkmq.client.MqConsumeListener;
import org.noear.folkmq.client.MqMessage;
import org.noear.folkmq.client.MqTransaction;
import org.noear.folkmq.client.MqTransactionCheckback;
import org.noear.socketd.transport.client.ClientConfig;
import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.cloud.CloudEventHandler;
import org.noear.solon.cloud.CloudProps;
import org.noear.solon.cloud.annotation.EventLevel;
import org.noear.solon.cloud.exception.CloudEventException;
import org.noear.solon.cloud.extend.folkmq.FolkmqProps;
import org.noear.solon.cloud.extend.folkmq.impl.FolkmqConsumeHandler;
import org.noear.solon.cloud.extend.folkmq.impl.FolkmqTransactionListener;
import org.noear.solon.cloud.model.Event;
import org.noear.solon.cloud.model.EventObserver;
import org.noear.solon.cloud.model.EventTran;
import org.noear.solon.cloud.model.EventTranListener;
import org.noear.solon.cloud.model.Instance;
import org.noear.solon.cloud.service.CloudEventObserverManger;
import org.noear.solon.cloud.service.CloudEventServicePlus;
import org.noear.solon.core.bean.LifecycleBean;
import org.noear.solon.core.event.EventBus;

public class CloudEventServiceFolkMqImpl
implements CloudEventServicePlus,
LifecycleBean {
    protected final MqClient client;
    private final CloudProps cloudProps;
    private final FolkmqConsumeHandler folkmqConsumeHandler;
    private final CloudEventObserverManger observerManger;
    private final long publishTimeout;
    private String channel;
    private String group;

    public CloudEventServiceFolkMqImpl(CloudProps cloudProps) {
        this.cloudProps = cloudProps;
        this.observerManger = new CloudEventObserverManger();
        this.folkmqConsumeHandler = new FolkmqConsumeHandler(this.observerManger);
        this.publishTimeout = cloudProps.getEventPublishTimeout();
        this.client = FolkMQ.createClient((String[])new String[]{cloudProps.getEventServer()}).nameAs(Solon.cfg().appName()).namespaceAs(cloudProps.getNamespace()).autoAcknowledge(false);
        if (this.publishTimeout > 0L) {
            this.client.config(c -> {
                ClientConfig cfr_ignored_0 = (ClientConfig)c.requestTimeout(this.publishTimeout);
            });
        }
        EventBus.publish((Object)this.client);
        Solon.context().wrapAndPut(MqClient.class, (Object)this.client);
        Solon.context().getBeanAsync(MqTransactionCheckback.class, bean -> this.client.transactionCheckback(bean));
        Solon.context().getBeanAsync(MqConsumeListener.class, bean -> this.client.listen((MqConsumeHandler)bean));
        try {
            this.client.connect();
        }
        catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    private void beginTransaction(EventTran transaction) {
        if (transaction.getListener(FolkmqTransactionListener.class) != null) {
            return;
        }
        transaction.setListener((EventTranListener)new FolkmqTransactionListener(this.client.newTransaction()));
    }

    public boolean publish(Event event) throws CloudEventException {
        if (Utils.isEmpty((String)event.topic())) {
            throw new IllegalArgumentException("Event missing topic");
        }
        if (Utils.isEmpty((String)event.content())) {
            throw new IllegalArgumentException("Event missing content");
        }
        if (event.tran() != null) {
            this.beginTransaction(event.tran());
        }
        String topicNew = FolkmqProps.getTopicNew(event);
        try {
            MqMessage message = new MqMessage(event.content(), event.key()).scheduled(event.scheduled()).broadcast(event.broadcast()).tag(event.tags()).qos(event.qos());
            if (event.tran() != null) {
                MqTransaction transaction = ((FolkmqTransactionListener)event.tran().getListener(FolkmqTransactionListener.class)).getTransaction();
                message.transaction(transaction);
            }
            if (this.publishTimeout > 0L) {
                this.client.publish(topicNew, message);
            } else {
                this.client.publishAsync(topicNew, message);
            }
        }
        catch (Throwable ex) {
            throw new CloudEventException(ex);
        }
        return true;
    }

    public void attention(EventLevel level, String channel, String group, String topic, String tag, int qos, CloudEventHandler observer) {
        String topicNew = Utils.isEmpty((String)group) ? topic : group + "--" + topic;
        this.observerManger.add(topicNew, level, group, topic, tag, qos, observer);
    }

    public void postStart() throws Throwable {
        this.subscribe();
    }

    private void subscribe() throws IOException {
        if (this.observerManger.topicSize() > 0) {
            Instance instance = Instance.local();
            for (String topicNew : this.observerManger.topicAll()) {
                EventObserver observer = this.observerManger.getByTopic(topicNew);
                if (observer.getLevel() == EventLevel.instance) {
                    String instanceName = Instance.local().serviceAndAddress().replace("@", "-").replace(".", "_").replace(":", "_");
                    this.client.subscribe(topicNew, instanceName, (MqConsumeHandler)this.folkmqConsumeHandler);
                    continue;
                }
                this.client.subscribe(topicNew, instance.service(), (MqConsumeHandler)this.folkmqConsumeHandler);
            }
        }
    }

    public String getChannel() {
        if (this.channel == null) {
            this.channel = this.cloudProps.getEventChannel();
        }
        return this.channel;
    }

    public String getGroup() {
        if (this.group == null) {
            this.group = this.cloudProps.getEventGroup();
        }
        return this.group;
    }
}

