/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.jms;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.jms.JmsListenerContainerConfig;
import ru.tinkoff.kora.jms.JmsMessageListener;
import ru.tinkoff.kora.jms.JmsUtils;
import ru.tinkoff.kora.jms.telemetry.JmsConsumerTelemetry;
import ru.tinkoff.kora.jms.telemetry.JmsConsumerTelemetryFactory;
import ru.tinkoff.kora.logging.common.arg.StructuredArgument;

public class JmsMessageListenerContainer
implements Lifecycle {
    private static final ConcurrentHashMap<String, AtomicInteger> threadCounters = new ConcurrentHashMap();
    private final ConnectionFactory connectionFactory;
    private final JmsListenerContainerConfig config;
    private final JmsMessageListener messageListener;
    private final Logger log;
    private final JmsConsumerTelemetry telemetry;
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private volatile ExecutorService executorService;

    public JmsMessageListenerContainer(ConnectionFactory connectionFactory, JmsListenerContainerConfig config, JmsMessageListener messageListener, JmsConsumerTelemetryFactory telemetryFactory) {
        this.connectionFactory = connectionFactory;
        this.config = config;
        this.messageListener = messageListener;
        this.log = LoggerFactory.getLogger(JmsMessageListenerContainer.class);
        this.telemetry = telemetryFactory.get(config.telemetry(), config.queueName());
    }

    public void init() {
        if (this.isStarted.compareAndSet(false, true)) {
            if (this.config.threads() == 0) {
                return;
            }
            this.executorService = Executors.newFixedThreadPool(this.config.threads());
            for (int i = 0; i < this.config.threads(); ++i) {
                this.executorService.submit(this::connectLoop);
            }
        }
    }

    public void release() {
        if (this.isStarted.compareAndSet(true, false)) {
            if (this.config.threads() == 0) {
                return;
            }
            this.executorService.shutdownNow();
            try {
                this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
                this.executorService = null;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private void connectLoop() {
        AtomicInteger counter = threadCounters.computeIfAbsent(this.config.queueName(), s -> new AtomicInteger());
        Thread.currentThread().setName("jms-" + this.config.queueName() + "-" + counter.getAndIncrement());
        this.log.info("listening...");
        while (this.isStarted.get()) {
            try {
                this.log.trace("Trying new connection");
                Connection connection = this.connectionFactory.createConnection();
                try {
                    Session session = connection.createSession(true, 0);
                    try {
                        connection.start();
                        this.pollLoop(session);
                    }
                    finally {
                        if (session == null) continue;
                        session.close();
                    }
                }
                finally {
                    if (connection == null) continue;
                    connection.close();
                }
            }
            catch (JMSException e) {
                this.log.info("Jms exception caught while processing message: {}", (Object)e.toString(), (Object)e);
                try {
                    Thread.sleep(60000L);
                }
                catch (InterruptedException ex) {
                    this.log.trace("Jms thread interrupted");
                }
            }
            catch (Exception e) {
                this.log.trace("Unknown ex");
                try {
                    Thread.sleep(60000L);
                }
                catch (InterruptedException ex) {
                    this.log.trace("Jms thread interrupted");
                }
            }
        }
        this.log.info("Consumer stopped");
    }

    private void pollLoop(Session session) throws JMSException {
        Queue queue = session.createQueue(this.config.queueName());
        try (MessageConsumer consumer = session.createConsumer((Destination)queue, null);){
            while (this.isStarted.get()) {
                try {
                    Message message = consumer.receiveNoWait();
                    long lastSuccessfullReceive = System.nanoTime();
                    if (message == null) {
                        this.log.trace("No message was received");
                        Thread.sleep(1000L);
                        session.commit();
                        continue;
                    }
                    JmsConsumerTelemetry.JmsConsumerTelemetryContext telemetryCtx = this.telemetry.get(message);
                    try {
                        if (this.log.isDebugEnabled()) {
                            String body = JmsUtils.text(message);
                            String headers = JmsUtils.dumpHeaders(message).toString();
                            this.log.debug(StructuredArgument.marker((String)"jmsInputMessage", gen -> {
                                gen.writeStartObject();
                                gen.writeStringField("headers", headers);
                                gen.writeStringField("body", body);
                                gen.writeEndObject();
                            }), "JmsListener.message");
                        }
                        this.messageListener.onMessage(session, message);
                        session.commit();
                        telemetryCtx.close(null);
                    }
                    catch (Exception e) {
                        telemetryCtx.close(e);
                        throw e;
                    }
                    finally {
                        Context.clear();
                        MDC.clear();
                    }
                }
                catch (InterruptedException e) {
                    this.log.trace("Jms thread interrupted");
                }
                catch (JMSException e) {
                    session.rollback();
                    throw e;
                }
                catch (Exception e) {
                    this.log.debug("Exception caught while processing message", (Throwable)e);
                    session.rollback();
                }
            }
            this.log.trace("Poll loop end");
        }
    }
}

