package io.scalecube.gateway.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.scalecube.gateway.rabbitmq.Rmq;
import java.io.IOException;
import java.util.Map;
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

/* loaded from: input_file:io/scalecube/gateway/rabbitmq/RabbitListener.class */
public class RabbitListener extends RmqChannel {
    private final Subject<byte[], byte[]> incomingMessagesSubject;
    private final MessageSerialization serialization;

    public RabbitListener(Rmq.Builder builder) throws Exception {
        super(builder);
        this.serialization = builder.serialization();
        this.incomingMessagesSubject = PublishSubject.create().toSerialized();
    }

    public void subscribe(Exchange exchange, Topic topic, String str) throws Exception {
        this.channel.exchangeDeclare(exchange.exchange(), exchange.type(), exchange.durable(), exchange.autoDelete(), exchange.autoDelete(), exchange.properties());
        this.channel.queueDeclare(topic.name(), topic.durable(), topic.exclusive(), topic.autoDelete(), (Map) null);
        this.channel.queueBind(topic.name(), exchange.exchange(), str);
        this.channel.basicConsume(topic.name(), true, createConsumer(this.channel));
    }

    public void subscribe(Topic topic) throws Exception {
        this.channel.queueDeclare(topic.name(), topic.durable(), topic.exclusive(), topic.autoDelete(), (Map) null);
        this.channel.basicConsume(topic.name(), true, createConsumer(this.channel));
    }

    private Consumer createConsumer(Channel channel) {
        return new DefaultConsumer(channel) { // from class: io.scalecube.gateway.rabbitmq.RabbitListener.1
            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                RabbitListener.this.incomingMessagesSubject.onNext(bArr);
            }
        };
    }

    public <T> Observable<T> listen(Class<T> cls) {
        return this.incomingMessagesSubject.onBackpressureBuffer().map(bArr -> {
            return deserialize(bArr, cls);
        });
    }

    public <T> Observable<byte[]> listen() {
        return this.incomingMessagesSubject.onBackpressureBuffer();
    }

    private <T> T deserialize(byte[] bArr, Class<T> cls) {
        try {
            return (T) this.serialization.deserialize(bArr, cls);
        } catch (Exception e) {
            return null;
        }
    }

    public Channel channel() {
        return this.channel;
    }
}
