/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.jms;

import ch.squaredesk.nova.comm.jms.IncomingMessageMetaData;
import ch.squaredesk.nova.comm.jms.JmsMessageMetaDataCreator;
import ch.squaredesk.nova.comm.jms.JmsObjectRepository;
import ch.squaredesk.nova.comm.retrieving.IncomingMessage;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MessageReceiver
extends ch.squaredesk.nova.comm.retrieving.MessageReceiver<Destination, String, IncomingMessageMetaData> {
    private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
    private final JmsObjectRepository jmsObjectRepository;
    private final Map<String, Flowable<IncomingMessage<String, IncomingMessageMetaData>>> mapDestinationIdToMessageStream = new ConcurrentHashMap<String, Flowable<IncomingMessage<String, IncomingMessageMetaData>>>();
    private final JmsMessageMetaDataCreator messageDetailsCreator = new JmsMessageMetaDataCreator();

    MessageReceiver(String identifier, JmsObjectRepository jmsObjectRepository, Metrics metrics) {
        super(identifier, metrics);
        this.jmsObjectRepository = jmsObjectRepository;
    }

    public Flowable<IncomingMessage<String, IncomingMessageMetaData>> messages(Destination destination) {
        Objects.requireNonNull(destination, "destination must not ne bull");
        Objects.requireNonNull(destination, "unmarshaller must not ne bull");
        String destinationId = this.jmsObjectRepository.idFor(destination);
        return this.mapDestinationIdToMessageStream.computeIfAbsent(destinationId, key -> {
            Flowable f = Flowable.generate(() -> {
                logger.info("Opening connection to destination " + destinationId);
                this.metricsCollector.subscriptionCreated((Object)destinationId);
                return this.jmsObjectRepository.createMessageConsumer(destination);
            }, (consumer, emitter) -> {
                IncomingMessage incomingMessage = null;
                while (incomingMessage == null) {
                    String transportMessage;
                    Message m = null;
                    try {
                        m = consumer.receive();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    if (m == null) {
                        logger.info("Unable to receive message from consumer for destination " + destinationId + ". Closing the connection...");
                        emitter.onComplete();
                        return;
                    }
                    if (!(m instanceof TextMessage)) {
                        logger.error("Unsupported type of incoming message " + m);
                        this.metricsCollector.unparsableMessageReceived((Object)destinationId);
                        continue;
                    }
                    try {
                        transportMessage = ((TextMessage)m).getText();
                    }
                    catch (Exception e) {
                        logger.error("Unable to read incoming message " + m, (Throwable)e);
                        this.metricsCollector.unparsableMessageReceived((Object)destinationId);
                        continue;
                    }
                    IncomingMessageMetaData meta = this.messageDetailsCreator.createIncomingMessageMetaData(m);
                    incomingMessage = new IncomingMessage((Object)transportMessage, (ch.squaredesk.nova.comm.retrieving.IncomingMessageMetaData)meta);
                    this.metricsCollector.messageReceived((Object)destinationId);
                }
                emitter.onNext(incomingMessage);
            }, consumer -> {
                this.metricsCollector.subscriptionDestroyed((Object)destinationId);
                this.jmsObjectRepository.destroyConsumer((MessageConsumer)consumer);
                this.mapDestinationIdToMessageStream.remove(destinationId);
                logger.info("Closed connection to destination " + destinationId);
            });
            return f.subscribeOn(Schedulers.io()).share();
        });
    }
}

