/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.cloud.extend.rabbitmq.service;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.cloud.CloudEventHandler;
import org.noear.solon.cloud.annotation.EventLevel;
import org.noear.solon.cloud.extend.rabbitmq.RabbitmqProps;
import org.noear.solon.cloud.extend.rabbitmq.service.RabbitConsumer;
import org.noear.solon.cloud.extend.rabbitmq.service.RabbitMQX;
import org.noear.solon.cloud.model.Event;
import org.noear.solon.cloud.service.CloudEventObserverEntity;
import org.noear.solon.cloud.service.CloudEventService;
import org.noear.solon.core.event.EventBus;

public class CloudEventServiceImp
implements CloudEventService {
    RabbitMQX rabbitMQX;
    String rabbit_exchangeName;
    String rabbit_routingKey;
    String rabbit_queueName;
    final BuiltinExchangeType rabbit_type = BuiltinExchangeType.DIRECT;
    final boolean rabbit_durable = true;
    final boolean rabbit_autoDelete = false;
    final boolean rabbit_internal = false;
    final AMQP.BasicProperties rabbit_msgProps;
    Channel channel;
    Map<String, CloudEventObserverEntity> observerMap = new HashMap<String, CloudEventObserverEntity>();

    public CloudEventServiceImp(String server) {
        this.rabbit_exchangeName = Solon.cfg().appGroup();
        if (Utils.isEmpty((String)this.rabbit_exchangeName)) {
            this.rabbit_exchangeName = "DEFAULT_GROUP";
        }
        this.rabbit_routingKey = this.rabbit_exchangeName;
        this.rabbit_queueName = RabbitmqProps.instance.getEventQueue();
        this.rabbit_msgProps = new AMQP.BasicProperties().builder().deliveryMode(Integer.valueOf(2)).contentType("UTF-8").build();
        try {
            this.rabbitMQX = new RabbitMQX(server, RabbitmqProps.instance.getUsername(), RabbitmqProps.instance.getPassword());
            this.channel = this.rabbitMQX.createChannel();
            this.initDeclareAndBind(this.channel);
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void initDeclareAndBind(Channel channel) throws IOException {
        channel.exchangeDeclare(this.rabbit_exchangeName, this.rabbit_type, true, false, false, new HashMap());
        channel.queueDeclare(this.rabbit_queueName, true, false, false, new HashMap());
        channel.queueBind(this.rabbit_queueName, this.rabbit_exchangeName, this.rabbit_routingKey, new HashMap());
    }

    public boolean publish(Event event) {
        try {
            this.channel.basicPublish(this.rabbit_exchangeName, this.rabbit_routingKey, false, this.rabbit_msgProps, event.content().getBytes(StandardCharsets.UTF_8));
            return true;
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    public void attention(EventLevel level, String group, String topic, CloudEventHandler observer) {
        if (this.observerMap.containsKey(topic)) {
            return;
        }
        this.observerMap.put(topic, new CloudEventObserverEntity(level, group, topic, observer));
    }

    public boolean onReceive(Event event) throws Throwable {
        boolean isOk = true;
        CloudEventObserverEntity entity = null;
        entity = this.observerMap.get(event.topic());
        if (entity != null) {
            isOk = entity.handler(event);
        }
        return isOk;
    }

    public void subscribe() {
        try {
            for (String topic : this.observerMap.keySet()) {
                RabbitConsumer consumer = new RabbitConsumer(this.channel, topic, this);
                this.channel.basicConsume(topic, (Consumer)consumer);
            }
        }
        catch (IOException ex) {
            EventBus.push((Object)ex);
        }
    }
}

