/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.amqp.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.smallmind.instrument.ChronometerInstrumentAndReturn;
import org.smallmind.instrument.InstrumentAndReturn;
import org.smallmind.instrument.InstrumentationManager;
import org.smallmind.instrument.MetricProperty;
import org.smallmind.instrument.config.MetricConfigurationProvider;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.phalanx.wire.MetricType;
import org.smallmind.phalanx.wire.ResultSignal;
import org.smallmind.phalanx.wire.SignalCodec;
import org.smallmind.phalanx.wire.amqp.rabbitmq.MessageRouter;
import org.smallmind.phalanx.wire.amqp.rabbitmq.NameConfiguration;
import org.smallmind.phalanx.wire.amqp.rabbitmq.RabbitMQConnector;
import org.smallmind.phalanx.wire.amqp.rabbitmq.RabbitMQMessage;
import org.smallmind.phalanx.wire.amqp.rabbitmq.RabbitMQResponseTransport;
import org.smallmind.phalanx.wire.jms.QueueOperator;
import org.smallmind.scribe.pen.LoggerManager;

public class ResponseMessageRouter
extends MessageRouter {
    private final RabbitMQResponseTransport responseTransport;
    private final SignalCodec signalCodec;
    private final String serviceGroup;
    private final String instanceId;
    private final int index;
    private final int ttlSeconds;

    public ResponseMessageRouter(RabbitMQConnector connector, NameConfiguration nameConfiguration, RabbitMQResponseTransport responseTransport, SignalCodec signalCodec, String serviceGroup, String instanceId, int index, int ttlSeconds) {
        super(connector, nameConfiguration);
        this.responseTransport = responseTransport;
        this.signalCodec = signalCodec;
        this.serviceGroup = serviceGroup;
        this.instanceId = instanceId;
        this.index = index;
        this.ttlSeconds = ttlSeconds;
    }

    @Override
    public final void bindQueues(Channel channel) throws IOException {
        String talkQueueName = this.getTalkQueueName() + "-" + this.serviceGroup;
        channel.queueDeclare(talkQueueName, false, false, false, null);
        channel.queueBind(talkQueueName, this.getRequestExchangeName(), this.serviceGroup);
        String whisperQueueName = this.getWhisperQueueName() + "-" + this.serviceGroup + "[" + this.instanceId + "]";
        channel.queueDeclare(whisperQueueName, false, false, true, null);
        channel.queueBind(whisperQueueName, this.getRequestExchangeName(), this.serviceGroup + "[" + this.instanceId + "]");
    }

    @Override
    public void installConsumer(Channel channel) throws IOException {
        this.installConsumerInternal(channel, this.getTalkQueueName() + "-" + this.serviceGroup);
        this.installConsumerInternal(channel, this.getWhisperQueueName() + "-" + this.serviceGroup + "[" + this.instanceId + "]");
    }

    private void installConsumerInternal(Channel channel, String queueName) throws IOException {
        channel.basicConsume(queueName, true, queueName + "[" + this.index + "]", false, false, null, (Consumer)new DefaultConsumer(channel){

            public synchronized void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                try {
                    long timeInQueue = System.currentTimeMillis() - ResponseMessageRouter.this.getTimestamp(properties);
                    LoggerManager.getLogger(QueueOperator.class).debug("request message received(%s) in %d ms...", new Object[]{properties.getMessageId(), timeInQueue});
                    InstrumentationManager.instrumentWithChronometer((MetricConfigurationProvider)ResponseMessageRouter.this.responseTransport, (long)(timeInQueue >= 0L ? timeInQueue : 0L), (TimeUnit)TimeUnit.MILLISECONDS, (MetricProperty[])new MetricProperty[]{new MetricProperty("queue", MetricType.REQUEST_DESTINATION_TRANSIT.getDisplay())});
                    ResponseMessageRouter.this.responseTransport.execute(new RabbitMQMessage(properties, body));
                }
                catch (Exception exception) {
                    LoggerManager.getLogger(ResponseMessageRouter.class).error((Throwable)exception);
                }
            }
        });
    }

    public String publish(String callerId, String correlationId, boolean error, String nativeType, Object result) throws Exception {
        RabbitMQMessage rabbitMQMessage = this.constructMessage(correlationId, error, nativeType, result);
        this.send(callerId, this.getResponseExchangeName(), rabbitMQMessage.getProperties(), rabbitMQMessage.getBody());
        return rabbitMQMessage.getProperties().getMessageId();
    }

    private RabbitMQMessage constructMessage(final String correlationId, final boolean error, final String nativeType, final Object result) throws Exception {
        return (RabbitMQMessage)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<RabbitMQMessage>((MetricConfigurationProvider)this.responseTransport, new MetricProperty[]{new MetricProperty("event", MetricType.CONSTRUCT_MESSAGE.getDisplay())}){

            public RabbitMQMessage withChronometer() throws Exception {
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().contentType(ResponseMessageRouter.this.signalCodec.getContentType()).messageId(SnowflakeId.newInstance().generateDottedString()).correlationId(correlationId).timestamp(new Date()).expiration(String.valueOf(ResponseMessageRouter.this.ttlSeconds * 1000 * 3)).build();
                return new RabbitMQMessage(properties, ResponseMessageRouter.this.signalCodec.encode(new ResultSignal(error, nativeType, result)));
            }
        });
    }
}

