/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.cloud.extend.rabbitmq.impl;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
import org.noear.solon.cloud.CloudProps;
import org.noear.solon.cloud.extend.rabbitmq.RabbitmqProps;
import org.noear.solon.cloud.extend.rabbitmq.impl.RabbitChannelFactory;
import org.noear.solon.cloud.extend.rabbitmq.impl.RabbitConfig;
import org.noear.solon.cloud.extend.rabbitmq.impl.RabbitConsumeHandler;
import org.noear.solon.cloud.extend.rabbitmq.impl.RabbitProducer;
import org.noear.solon.cloud.service.CloudEventObserverManger;

public class RabbitConsumer {
    private CloudProps cloudProps;
    private RabbitConfig cfg;
    private Channel channel;
    private RabbitChannelFactory factory;
    private RabbitConsumeHandler handler;
    private RabbitProducer producer;

    public RabbitConsumer(CloudProps cloudProps, RabbitProducer producer, RabbitChannelFactory factory) {
        this.cloudProps = cloudProps;
        this.cfg = factory.getConfig();
        this.factory = factory;
        this.producer = producer;
    }

    public void init(CloudEventObserverManger observerManger) throws IOException, TimeoutException {
        this.channel = this.factory.getChannel();
        this.handler = new RabbitConsumeHandler(this.cloudProps, this.producer, this.cfg, this.channel, observerManger);
        int prefetchCount = RabbitmqProps.instance.getEventPrefetchCount();
        if (prefetchCount < 1) {
            prefetchCount = 10;
        }
        this.channel.basicQos(prefetchCount);
        this.queueDeclareNormal(observerManger);
        this.queueDeclareReady();
        this.queueDeclareRetry();
    }

    private void queueDeclareNormal(CloudEventObserverManger observerManger) throws IOException {
        HashMap args = new HashMap();
        this.channel.queueDeclare(this.cfg.queue_normal, this.cfg.durable, this.cfg.exclusive, this.cfg.autoDelete, args);
        for (String topic : observerManger.topicAll()) {
            this.channel.queueBind(this.cfg.queue_normal, this.cfg.exchangeName, topic, args);
        }
        this.channel.basicConsume(this.cfg.queue_normal, (Consumer)this.handler);
    }

    private void queueDeclareReady() throws IOException {
        HashMap<String, String> args = new HashMap<String, String>();
        args.put("x-dead-letter-exchange", this.cfg.exchangeName);
        args.put("x-dead-letter-routing-key", this.cfg.queue_retry);
        this.channel.queueDeclare(this.cfg.queue_ready, this.cfg.durable, this.cfg.exclusive, this.cfg.autoDelete, args);
        this.channel.queueBind(this.cfg.queue_ready, this.cfg.exchangeName, this.cfg.queue_ready, args);
    }

    private void queueDeclareRetry() throws IOException {
        HashMap args = new HashMap();
        this.channel.queueDeclare(this.cfg.queue_retry, this.cfg.durable, this.cfg.exclusive, this.cfg.autoDelete, args);
        this.channel.queueBind(this.cfg.queue_retry, this.cfg.exchangeName, this.cfg.queue_retry, args);
        this.channel.basicConsume(this.cfg.queue_retry, (Consumer)this.handler);
    }
}

