/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.cloud.extend.rocketmq.service;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.cloud.CloudEventHandler;
import org.noear.solon.cloud.annotation.EventLevel;
import org.noear.solon.cloud.model.Event;
import org.noear.solon.cloud.service.CloudEventObserverEntity;
import org.noear.solon.cloud.service.CloudEventService;
import org.noear.solon.core.event.EventBus;

public class CloudEventServiceImp
implements CloudEventService,
MessageListenerConcurrently {
    String server;
    String group;
    DefaultMQProducer producer;
    DefaultMQPushConsumer consumer;
    Map<String, CloudEventObserverEntity> observerMap = new HashMap<String, CloudEventObserverEntity>();

    public CloudEventServiceImp(String server) {
        this.server = server;
        this.group = Solon.cfg().appGroup();
        if (Utils.isEmpty((String)this.group)) {
            this.group = "DEFAULT_GROUP";
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initProducer() {
        if (this.producer != null) {
            return;
        }
        String string = this.group;
        synchronized (string) {
            if (this.producer != null) {
                return;
            }
            this.producer = new DefaultMQProducer(this.group);
            this.producer.setNamesrvAddr(this.server);
            this.producer.setSendMsgTimeout(3000);
            try {
                this.producer.start();
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initConsumer(Set<String> topics) {
        if (this.consumer != null) {
            return;
        }
        String string = this.group;
        synchronized (string) {
            if (this.consumer != null) {
                return;
            }
            this.consumer = new DefaultMQPushConsumer(this.group);
            this.consumer.setNamesrvAddr(this.server);
            this.consumer.setPullBatchSize(1);
            try {
                for (String topic : topics) {
                    this.consumer.subscribe(topic, "*");
                }
                this.consumer.registerMessageListener((MessageListenerConcurrently)this);
                this.consumer.start();
            }
            catch (Exception ex) {
                ex.printStackTrace();
                throw new RuntimeException(ex);
            }
        }
    }

    public boolean publish(Event event) {
        this.initProducer();
        try {
            Message message = new Message(event.topic(), event.tags(), event.key(), event.content().getBytes(StandardCharsets.UTF_8));
            SendResult send = this.producer.send(message);
            return send.getSendStatus().equals((Object)SendStatus.SEND_OK);
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    public void attention(EventLevel level, String group, String topic, CloudEventHandler observer) {
        if (this.observerMap.containsKey(topic)) {
            return;
        }
        this.observerMap.put(topic, new CloudEventObserverEntity(level, group, topic, observer));
    }

    public void subscribe() {
        this.initConsumer(this.observerMap.keySet());
    }

    public boolean onReceive(Event event) throws Throwable {
        boolean isOk = true;
        CloudEventObserverEntity entity = null;
        entity = this.observerMap.get(event.topic());
        if (entity != null) {
            isOk = entity.handler(event);
        }
        return isOk;
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        boolean isOk = true;
        try {
            for (MessageExt message : list) {
                Event event = new Event(message.getTopic(), new String(message.getBody()));
                event.tags(message.getTags());
                event.key(message.getKeys());
                isOk = isOk && this.onReceive(event);
            }
        }
        catch (Throwable ex) {
            EventBus.push((Object)ex);
        }
        if (isOk) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

