/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.cloud.extend.water.service;

import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.cloud.CloudEventHandler;
import org.noear.solon.cloud.CloudProps;
import org.noear.solon.cloud.annotation.EventLevel;
import org.noear.solon.cloud.exception.CloudEventException;
import org.noear.solon.cloud.model.Event;
import org.noear.solon.cloud.model.Instance;
import org.noear.solon.cloud.service.CloudEventObserverManger;
import org.noear.solon.cloud.service.CloudEventServicePlus;
import org.noear.water.WaterClient;
import org.noear.water.utils.EncryptUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudEventServiceWaterImp
implements CloudEventServicePlus {
    static Logger log = LoggerFactory.getLogger(CloudEventServiceWaterImp.class);
    private final CloudProps cloudProps;
    private final String DEFAULT_SEAL = "Pckb6BpGzDE6RUIy";
    private String seal;
    private boolean unstable;
    private String eventChannelName;
    private String eventBroker;
    private CloudEventObserverManger instanceObserverManger = new CloudEventObserverManger();
    private CloudEventObserverManger clusterObserverManger = new CloudEventObserverManger();
    private String channel;
    private String group;

    public CloudEventServiceWaterImp(CloudProps cloudProps) {
        this.cloudProps = cloudProps;
        this.unstable = cloudProps.getDiscoveryUnstable() || Solon.cfg().isFilesMode() || Solon.cfg().isDriftMode();
        this.eventChannelName = cloudProps.getEventChannel();
        this.eventBroker = cloudProps.getEventBroker();
        this.seal = this.getEventSeal();
        if (Utils.isEmpty((String)this.seal)) {
            this.seal = "Pckb6BpGzDE6RUIy";
        }
    }

    public String getSeal() {
        return this.seal;
    }

    public boolean publish(Event event) throws CloudEventException {
        if (Utils.isEmpty((String)event.topic())) {
            throw new IllegalArgumentException("Event missing topic");
        }
        if (Utils.isEmpty((String)event.content())) {
            throw new IllegalArgumentException("Event missing content");
        }
        String topicNew = Utils.isEmpty((String)event.group()) ? event.topic() : event.group() + ":" + event.topic();
        try {
            return WaterClient.Message.sendMessageAndTags(this.eventBroker, event.key(), topicNew, event.content(), event.scheduled(), event.tags());
        }
        catch (Throwable ex) {
            throw new CloudEventException(ex);
        }
    }

    public void attention(EventLevel level, String channel, String group, String topic, CloudEventHandler observer) {
        String topicNew = Utils.isEmpty((String)group) ? topic : group + ":" + topic;
        if (level == EventLevel.instance) {
            this.instanceObserverManger.add(topicNew, level, group, topic, observer);
        } else {
            this.clusterObserverManger.add(topicNew, level, group, topic, observer);
        }
    }

    public void subscribe() {
        try {
            this.subscribe0();
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void subscribe0() throws Exception {
        Instance instance = Instance.local();
        if (this.instanceObserverManger.topicSize() > 0) {
            String instance_receiver_url = "http://" + instance.address() + "/_run/msg";
            String instance_subscriber_Key = EncryptUtils.md5((String)(instance.service() + "_instance_" + instance_receiver_url));
            WaterClient.Message.subscribeTopic(this.eventBroker, instance_subscriber_Key, instance.service(), Solon.cfg().appGroup(), instance_receiver_url, this.seal, "", 1, this.unstable, this.instanceObserverManger.topicAll().toArray(new String[this.instanceObserverManger.topicAll().size()]));
        }
        if (this.clusterObserverManger.topicSize() > 0) {
            String cluster_hostname = this.getEventReceive();
            String cluster_receiver_url = Utils.isEmpty((String)cluster_hostname) ? "@" + Solon.cfg().appName() + "/_run/msg" : (cluster_hostname.indexOf("://") > 0 ? cluster_hostname + "/_run/msg" : "http://" + cluster_hostname + "/_run/msg");
            String cluster_subscriber_Key = EncryptUtils.md5((String)(instance.service() + "_cluster_" + cluster_receiver_url));
            WaterClient.Message.subscribeTopic(this.eventBroker, cluster_subscriber_Key, instance.service(), Solon.cfg().appGroup(), cluster_receiver_url, this.seal, "", 1, false, this.clusterObserverManger.topicAll().toArray(new String[this.clusterObserverManger.topicSize()]));
        }
    }

    public boolean onReceive(String topicNew, Event event) throws Throwable {
        boolean isOk = true;
        boolean isHandled = false;
        CloudEventHandler entity = null;
        event.channel(this.eventChannelName);
        entity = this.instanceObserverManger.get(topicNew);
        if (entity != null) {
            isHandled = true;
            isOk = entity.handler(event);
        }
        if ((entity = this.clusterObserverManger.get(topicNew)) != null) {
            isHandled = true;
            boolean bl = isOk = isOk && entity.handler(event);
        }
        if (!isHandled) {
            log.warn("There is no observer for this event topic[{}]", (Object)event.topic());
        }
        return isOk;
    }

    public String getChannel() {
        if (this.channel == null) {
            this.channel = this.cloudProps.getEventChannel();
        }
        return this.channel;
    }

    public String getGroup() {
        if (this.group == null) {
            this.group = this.cloudProps.getEventGroup();
        }
        return this.group;
    }

    public String getEventSeal() {
        return this.cloudProps.getProp("event.seal");
    }

    public String getEventReceive() {
        return this.cloudProps.getProp("event.receive");
    }
}

