/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.transport.amqp.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.smallmind.claxon.registry.Instrument;
import org.smallmind.claxon.registry.Tag;
import org.smallmind.claxon.registry.meter.MeterBuilder;
import org.smallmind.claxon.registry.meter.MeterFactory;
import org.smallmind.claxon.registry.meter.SpeedometerBuilder;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.phalanx.wire.VocalMode;
import org.smallmind.phalanx.wire.signal.ResultSignal;
import org.smallmind.phalanx.wire.signal.SignalCodec;
import org.smallmind.phalanx.wire.transport.ClaxonTag;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.MessageRouter;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.NameConfiguration;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.PublisherConfirmationHandler;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.QueueContractor;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.RabbitMQConnector;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.RabbitMQMessage;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.RabbitMQResponseTransport;
import org.smallmind.phalanx.wire.transport.jms.QueueOperator;
import org.smallmind.scribe.pen.LoggerManager;

public class ResponseMessageRouter
extends MessageRouter {
    private final QueueContractor enduringQueueContractor;
    private final QueueContractor ephemeralQueueContractor;
    private final RabbitMQResponseTransport responseTransport;
    private final SignalCodec signalCodec;
    private final String serviceGroup;
    private final String instanceId;
    private final boolean autoAcknowledge;
    private final int index;
    private final int ttlSeconds;

    public ResponseMessageRouter(RabbitMQConnector connector, QueueContractor enduringQueueContractor, QueueContractor ephemeralQueueContractor, NameConfiguration nameConfiguration, RabbitMQResponseTransport responseTransport, SignalCodec signalCodec, String serviceGroup, String instanceId, int index, int ttlSeconds, boolean autoAcknowledge, PublisherConfirmationHandler publisherConfirmationHandler) {
        super(connector, "wire", nameConfiguration, publisherConfirmationHandler);
        this.enduringQueueContractor = enduringQueueContractor;
        this.ephemeralQueueContractor = ephemeralQueueContractor;
        this.responseTransport = responseTransport;
        this.signalCodec = signalCodec;
        this.serviceGroup = serviceGroup;
        this.instanceId = instanceId;
        this.index = index;
        this.ttlSeconds = ttlSeconds;
        this.autoAcknowledge = autoAcknowledge;
    }

    @Override
    public void bindQueues() throws IOException {
        this.operate(channel -> {
            String shoutQueueName = this.getShoutQueueName() + "-" + this.serviceGroup + "[" + this.instanceId + "]";
            this.ephemeralQueueContractor.declare(channel, shoutQueueName, true);
            channel.queueBind(shoutQueueName, this.getRequestExchangeName(), VocalMode.SHOUT.getName() + "-" + this.serviceGroup);
            String talkQueueName = this.getTalkQueueName() + "-" + this.serviceGroup;
            this.enduringQueueContractor.declare(channel, talkQueueName, false);
            channel.queueBind(talkQueueName, this.getRequestExchangeName(), VocalMode.TALK.getName() + "-" + this.serviceGroup);
            String whisperQueueName = this.getWhisperQueueName() + "-" + this.serviceGroup + "[" + this.instanceId + "]";
            this.ephemeralQueueContractor.declare(channel, whisperQueueName, true);
            channel.queueBind(whisperQueueName, this.getRequestExchangeName(), VocalMode.WHISPER.getName() + "-" + this.serviceGroup + "[" + this.instanceId + "]");
        });
    }

    public void play() throws IOException {
        this.installConsumer();
    }

    public void pause() throws IOException {
        this.unInstallConsumer();
    }

    @Override
    public void installConsumer() throws IOException {
        this.operate(channel -> {
            this.bindQueues();
            this.installConsumerInternal(channel, this.getShoutQueueName() + "-" + this.serviceGroup + "[" + this.instanceId + "]");
            this.installConsumerInternal(channel, this.getTalkQueueName() + "-" + this.serviceGroup);
            this.installConsumerInternal(channel, this.getWhisperQueueName() + "-" + this.serviceGroup + "[" + this.instanceId + "]");
        });
    }

    public void unInstallConsumer() throws IOException {
        this.operate(channel -> {
            channel.basicCancel(this.getShoutQueueName() + "-" + this.serviceGroup + "[" + this.instanceId + "][" + this.index + "]");
            channel.basicCancel(this.getTalkQueueName() + "-" + this.serviceGroup + "[" + this.index + "]");
            channel.basicCancel(this.getWhisperQueueName() + "-" + this.serviceGroup + "[" + this.instanceId + "][" + this.index + "]");
        });
    }

    private void installConsumerInternal(final Channel channel, String queueName) throws IOException {
        channel.basicConsume(queueName, this.autoAcknowledge, queueName + "[" + this.index + "]", false, false, null, (Consumer)new DefaultConsumer(channel){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public synchronized void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                try {
                    long timeInQueue = System.currentTimeMillis() - ResponseMessageRouter.this.getTimestamp(properties);
                    LoggerManager.getLogger(QueueOperator.class).debug("request message received(%s) in %d ms...", new Object[]{properties.getMessageId(), timeInQueue});
                    Instrument.with(ResponseMessageRouter.class, (MeterBuilder)MeterFactory.instance(SpeedometerBuilder::new), (Tag[])new Tag[]{new Tag("queue", ClaxonTag.REQUEST_TRANSIT_TIME.getDisplay())}).update(timeInQueue >= 0L ? timeInQueue : 0L, TimeUnit.MILLISECONDS);
                    ResponseMessageRouter.this.responseTransport.execute(new RabbitMQMessage(properties, body));
                }
                catch (Throwable throwable) {
                    LoggerManager.getLogger(ResponseMessageRouter.class).error(throwable);
                }
                finally {
                    if (!ResponseMessageRouter.this.autoAcknowledge) {
                        try {
                            channel.basicAck(envelope.getDeliveryTag(), true);
                        }
                        catch (IOException ioException) {
                            LoggerManager.getLogger(ResponseMessageRouter.class).error((Throwable)ioException);
                        }
                    }
                }
            }
        });
    }

    public String publish(String callerId, String correlationId, boolean error, String nativeType, Object result) throws Throwable {
        RabbitMQMessage rabbitMQMessage = this.constructMessage(correlationId, error, nativeType, result);
        this.send("response-" + callerId, this.getResponseExchangeName(), rabbitMQMessage.getProperties(), rabbitMQMessage.getBody());
        return rabbitMQMessage.getProperties().getMessageId();
    }

    private RabbitMQMessage constructMessage(String correlationId, boolean error, String nativeType, Object result) throws Throwable {
        return (RabbitMQMessage)Instrument.with(ResponseMessageRouter.class, (MeterBuilder)MeterFactory.instance(SpeedometerBuilder::new), (Tag[])new Tag[]{new Tag("event", ClaxonTag.CONSTRUCT_MESSAGE.getDisplay())}).on(() -> {
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().contentType(this.signalCodec.getContentType()).messageId(SnowflakeId.newInstance().generateDottedString()).correlationId(correlationId).timestamp(new Date()).expiration(String.valueOf(this.ttlSeconds * 1000 * 3)).build();
            return new RabbitMQMessage(properties, this.signalCodec.encode(new ResultSignal(error, nativeType, result)));
        });
    }
}

