/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.cloud.extend.water.service;

import java.util.HashMap;
import java.util.Map;
import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.cloud.CloudEventHandler;
import org.noear.solon.cloud.annotation.EventLevel;
import org.noear.solon.cloud.exception.CloudEventException;
import org.noear.solon.cloud.extend.water.WaterProps;
import org.noear.solon.cloud.model.Event;
import org.noear.solon.cloud.model.Instance;
import org.noear.solon.cloud.service.CloudEventObserverEntity;
import org.noear.solon.cloud.service.CloudEventServicePlus;
import org.noear.water.WaterClient;
import org.noear.water.utils.EncryptUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudEventServiceWaterImp
implements CloudEventServicePlus {
    static Logger log = LoggerFactory.getLogger(CloudEventServiceWaterImp.class);
    private final String DEFAULT_SEAL = "Pckb6BpGzDE6RUIy";
    private String seal;
    private Map<String, CloudEventObserverEntity> instanceObserverMap = new HashMap<String, CloudEventObserverEntity>();
    private Map<String, CloudEventObserverEntity> clusterObserverMap = new HashMap<String, CloudEventObserverEntity>();
    private boolean unstable = WaterProps.instance.getDiscoveryUnstable() || Solon.cfg().isFilesMode() || Solon.cfg().isDriftMode();
    private String eventChannelName = WaterProps.instance.getEventChannel();
    private String channel;
    private String group;

    public CloudEventServiceWaterImp() {
        this.seal = WaterProps.getEventSeal();
        if (Utils.isEmpty((String)this.seal)) {
            this.seal = "Pckb6BpGzDE6RUIy";
        }
    }

    public String getSeal() {
        return this.seal;
    }

    public boolean publish(Event event) throws CloudEventException {
        if (Utils.isEmpty((String)event.topic())) {
            throw new IllegalArgumentException("Event missing topic");
        }
        if (Utils.isEmpty((String)event.content())) {
            throw new IllegalArgumentException("Event missing content");
        }
        String topicNew = Utils.isEmpty((String)event.group()) ? event.topic() : event.group() + "::" + event.topic();
        try {
            return WaterClient.Message.sendMessageAndTags(event.key(), topicNew, event.content(), event.scheduled(), event.tags());
        }
        catch (Throwable ex) {
            throw new CloudEventException(ex);
        }
    }

    public void attention(EventLevel level, String channel, String group, String topic, CloudEventHandler observer) {
        if (level == EventLevel.instance) {
            this.instanceObserverMap.putIfAbsent(topic, new CloudEventObserverEntity(level, group, topic, observer));
        } else {
            String topicNew = Utils.isEmpty((String)group) ? topic : group + "::" + topic;
            this.clusterObserverMap.putIfAbsent(topicNew, new CloudEventObserverEntity(level, group, topic, observer));
        }
    }

    public void subscribe() {
        try {
            this.subscribe0();
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void subscribe0() throws Exception {
        Instance instance = Instance.local();
        if (this.instanceObserverMap.size() > 0) {
            String instance_receiver_url = "http://" + instance.address() + "/msg/receive";
            String instance_subscriber_Key = EncryptUtils.md5((String)(instance.service() + "_instance_" + instance_receiver_url));
            WaterClient.Message.subscribeTopic(instance_subscriber_Key, instance.service(), instance_receiver_url, this.seal, "", 1, this.unstable, new String[]{String.join((CharSequence)",", this.instanceObserverMap.keySet())});
        }
        if (this.clusterObserverMap.size() > 0) {
            String cluster_hostname = WaterProps.getEventReceive();
            if (Utils.isEmpty((String)cluster_hostname)) {
                cluster_hostname = instance.address();
            }
            String cluster_receiver_url = "http://" + cluster_hostname + "/msg/receive";
            String cluster_subscriber_Key = EncryptUtils.md5((String)(instance.service() + "_cluster_" + cluster_receiver_url));
            WaterClient.Message.subscribeTopic(cluster_subscriber_Key, instance.service(), cluster_receiver_url, this.seal, "", 1, false, new String[]{String.join((CharSequence)",", this.clusterObserverMap.keySet())});
        }
    }

    public boolean onReceive(String topicNew, Event event) throws Throwable {
        boolean isOk = true;
        boolean isHandled = false;
        CloudEventObserverEntity entity = null;
        event.channel(this.eventChannelName);
        entity = this.instanceObserverMap.get(topicNew);
        if (entity != null) {
            isHandled = true;
            isOk = entity.handler(event);
        }
        if ((entity = this.clusterObserverMap.get(topicNew)) != null) {
            isHandled = true;
            boolean bl = isOk = entity.handler(event) && isOk;
        }
        if (!isHandled) {
            log.warn("There is no observer for this event topic[{}]", (Object)event.topic());
        }
        return isOk;
    }

    public String getChannel() {
        if (this.channel == null) {
            this.channel = WaterProps.instance.getEventChannel();
        }
        return this.channel;
    }

    public String getGroup() {
        if (this.group == null) {
            this.group = WaterProps.instance.getEventGroup();
        }
        return this.group;
    }
}

