/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.cloud.extend.water.service;

import java.util.HashMap;
import java.util.Map;
import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.cloud.CloudEventHandler;
import org.noear.solon.cloud.annotation.EventLevel;
import org.noear.solon.cloud.extend.water.WaterProps;
import org.noear.solon.cloud.model.Event;
import org.noear.solon.cloud.model.Instance;
import org.noear.solon.cloud.service.CloudEventObserverEntity;
import org.noear.solon.cloud.service.CloudEventService;
import org.noear.water.WaterClient;
import org.noear.water.utils.EncryptUtils;

public class CloudEventServiceWaterImp
implements CloudEventService {
    private final String DEFAULT_DEAL = "Pckb6BpGzDE6RUIy";
    private String seal;
    private Map<String, CloudEventObserverEntity> instanceObserverMap = new HashMap<String, CloudEventObserverEntity>();
    private Map<String, CloudEventObserverEntity> clusterObserverMap = new HashMap<String, CloudEventObserverEntity>();
    private boolean unstable = WaterProps.instance.getDiscoveryUnstable() || Solon.cfg().isFilesMode() || Solon.cfg().isDriftMode();
    private String eventChannelName = WaterProps.instance.getEventChannel();

    public CloudEventServiceWaterImp() {
        this.seal = WaterProps.instance.getEventSeal();
        if (Utils.isEmpty((String)this.seal)) {
            this.seal = "Pckb6BpGzDE6RUIy";
        }
    }

    public String getSeal() {
        return this.seal;
    }

    public boolean publish(Event event) {
        if (Utils.isEmpty((String)event.topic())) {
            throw new IllegalArgumentException("Event missing topic");
        }
        if (Utils.isEmpty((String)event.content())) {
            throw new IllegalArgumentException("Event missing content");
        }
        try {
            return WaterClient.Message.sendMessageAndTags(event.key(), event.topic(), event.content(), event.scheduled(), event.tags());
        }
        catch (Throwable ex) {
            throw Utils.throwableWrap((Throwable)ex);
        }
    }

    public void attention(EventLevel level, String channel, String group, String topic, CloudEventHandler observer) {
        if (level == EventLevel.instance) {
            this.instanceObserverMap.putIfAbsent(topic, new CloudEventObserverEntity(level, group, topic, observer));
        } else {
            this.clusterObserverMap.putIfAbsent(topic, new CloudEventObserverEntity(level, group, topic, observer));
        }
    }

    public void subscribe() {
        try {
            this.subscribe0();
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void subscribe0() throws Exception {
        Instance instance = Instance.local();
        if (this.instanceObserverMap.size() > 0) {
            String instance_receiver_url = "http://" + instance.address() + "/msg/receive";
            String instance_subscriber_Key = EncryptUtils.md5((String)(instance.service() + "_instance_" + instance_receiver_url));
            WaterClient.Message.subscribeTopic(instance_subscriber_Key, instance.service(), instance_receiver_url, this.seal, "", 1, this.unstable, new String[]{String.join((CharSequence)",", this.instanceObserverMap.keySet())});
        }
        if (this.clusterObserverMap.size() > 0) {
            String cluster_hostname = WaterProps.instance.getEventReceive();
            if (Utils.isEmpty((String)cluster_hostname)) {
                cluster_hostname = instance.address();
            }
            String cluster_receiver_url = "http://" + cluster_hostname + "/msg/receive";
            String cluster_subscriber_Key = EncryptUtils.md5((String)(instance.service() + "_cluster_" + cluster_receiver_url));
            WaterClient.Message.subscribeTopic(cluster_subscriber_Key, instance.service(), cluster_receiver_url, this.seal, "", 1, false, new String[]{String.join((CharSequence)",", this.clusterObserverMap.keySet())});
        }
    }

    public boolean onReceive(Event event) throws Throwable {
        boolean isOk = true;
        CloudEventObserverEntity entity = null;
        event.channel(this.eventChannelName);
        entity = this.instanceObserverMap.get(event.topic());
        if (entity != null) {
            isOk = entity.handler(event);
        }
        if ((entity = this.clusterObserverMap.get(event.topic())) != null) {
            isOk = entity.handler(event) && isOk;
        }
        return isOk;
    }
}

