/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.transport.jms;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import org.smallmind.claxon.registry.Instrument;
import org.smallmind.claxon.registry.Tag;
import org.smallmind.claxon.registry.meter.LazyBuilder;
import org.smallmind.claxon.registry.meter.MeterBuilder;
import org.smallmind.claxon.registry.meter.SpeedometerBuilder;
import org.smallmind.phalanx.wire.TransportException;
import org.smallmind.phalanx.wire.signal.ResultSignal;
import org.smallmind.phalanx.wire.signal.SignalCodec;
import org.smallmind.phalanx.wire.transport.ClaxonTag;
import org.smallmind.phalanx.wire.transport.WireProperty;
import org.smallmind.phalanx.wire.transport.jms.ConnectionManager;
import org.smallmind.phalanx.wire.transport.jms.JmsRequestTransport;
import org.smallmind.phalanx.wire.transport.jms.SessionEmployer;
import org.smallmind.scribe.pen.LoggerManager;

public class ResponseListener
implements SessionEmployer,
MessageListener {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final JmsRequestTransport requestTransport;
    private final ConnectionManager responseConnectionManager;
    private final Topic responseTopic;
    private final SignalCodec signalCodec;
    private final String selector;
    private final byte[] buffer;

    public ResponseListener(JmsRequestTransport requestTransport, ConnectionManager responseConnectionManager, Topic responseTopic, SignalCodec signalCodec, String callerId, int maximumMessageLength) throws JMSException {
        this.requestTransport = requestTransport;
        this.responseConnectionManager = responseConnectionManager;
        this.responseTopic = responseTopic;
        this.signalCodec = signalCodec;
        this.buffer = new byte[maximumMessageLength];
        this.selector = WireProperty.CALLER_ID.getKey() + "='" + callerId + "'";
        responseConnectionManager.createConsumer(this);
    }

    @Override
    public Destination getDestination() {
        return this.responseTopic;
    }

    @Override
    public String getMessageSelector() {
        return this.selector;
    }

    public void close() throws JMSException {
        if (this.closed.compareAndSet(false, true)) {
            this.responseConnectionManager.stop();
            this.responseConnectionManager.close();
        }
    }

    public void onMessage(Message message) {
        try {
            long timeInTopic = System.currentTimeMillis() - message.getLongProperty(WireProperty.CLOCK.getKey());
            LoggerManager.getLogger(ResponseListener.class).debug("response message received(%s) in %d ms...", new Object[]{message.getJMSMessageID(), timeInTopic});
            Instrument.with(ResponseListener.class, (MeterBuilder)LazyBuilder.instance(SpeedometerBuilder::new), (Tag[])new Tag[]{new Tag("queue", ClaxonTag.RESPONSE_TRANSIT_TIME.getDisplay())}).update(timeInTopic >= 0L ? timeInTopic : 0L, TimeUnit.MILLISECONDS);
            Instrument.with(ResponseListener.class, (MeterBuilder)LazyBuilder.instance(SpeedometerBuilder::new), (Tag[])new Tag[]{new Tag("event", ClaxonTag.COMPLETE_CALLBACK.getDisplay())}).on(() -> {
                if (((BytesMessage)message).getBodyLength() > (long)this.buffer.length) {
                    throw new TransportException("Message length exceeds maximum capacity %d > %d", ((BytesMessage)message).getBodyLength(), this.buffer.length);
                }
                ((BytesMessage)message).readBytes(this.buffer);
                this.requestTransport.completeCallback(message.getJMSCorrelationID(), this.signalCodec.decode(this.buffer, 0, (int)((BytesMessage)message).getBodyLength(), ResultSignal.class));
            });
        }
        catch (Throwable throwable) {
            LoggerManager.getLogger(ResponseListener.class).error(throwable);
        }
    }
}

