/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.amqp.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.smallmind.instrument.ChronometerInstrument;
import org.smallmind.instrument.ChronometerInstrumentAndReturn;
import org.smallmind.instrument.Instrument;
import org.smallmind.instrument.InstrumentAndReturn;
import org.smallmind.instrument.InstrumentationManager;
import org.smallmind.instrument.MetricProperty;
import org.smallmind.instrument.config.MetricConfigurationProvider;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.phalanx.wire.Address;
import org.smallmind.phalanx.wire.InvocationSignal;
import org.smallmind.phalanx.wire.MetricType;
import org.smallmind.phalanx.wire.ResultSignal;
import org.smallmind.phalanx.wire.SignalCodec;
import org.smallmind.phalanx.wire.VocalMode;
import org.smallmind.phalanx.wire.Voice;
import org.smallmind.phalanx.wire.WireContext;
import org.smallmind.phalanx.wire.WireProperty;
import org.smallmind.phalanx.wire.amqp.rabbitmq.ChannelOperation;
import org.smallmind.phalanx.wire.amqp.rabbitmq.MessageRouter;
import org.smallmind.phalanx.wire.amqp.rabbitmq.NameConfiguration;
import org.smallmind.phalanx.wire.amqp.rabbitmq.RabbitMQConnector;
import org.smallmind.phalanx.wire.amqp.rabbitmq.RabbitMQMessage;
import org.smallmind.phalanx.wire.amqp.rabbitmq.RabbitMQRequestTransport;
import org.smallmind.phalanx.wire.amqp.rabbitmq.ResponseMessageRouter;
import org.smallmind.scribe.pen.LoggerManager;

public class RequestMessageRouter
extends MessageRouter {
    private static final String CALLER_ID_AMQP_KEY = "x-opt-" + WireProperty.CALLER_ID.getKey();
    private final RabbitMQRequestTransport requestTransport;
    private final SignalCodec signalCodec;
    private final String callerId;
    private final int index;
    private final int ttlSeconds;

    public RequestMessageRouter(RabbitMQConnector connector, NameConfiguration nameConfiguration, RabbitMQRequestTransport requestTransport, SignalCodec signalCodec, String callerId, int index, int ttlSeconds) {
        super(connector, nameConfiguration);
        this.requestTransport = requestTransport;
        this.signalCodec = signalCodec;
        this.callerId = callerId;
        this.index = index;
        this.ttlSeconds = ttlSeconds;
    }

    @Override
    public final void bindQueues() throws IOException {
        this.operate(new ChannelOperation(){

            @Override
            public void execute(Channel channel) throws IOException {
                String queueName = RequestMessageRouter.this.getResponseQueueName() + "-" + RequestMessageRouter.this.callerId;
                channel.queueDeclare(queueName, false, false, true, null);
                channel.queueBind(queueName, RequestMessageRouter.this.getResponseExchangeName(), "response-" + RequestMessageRouter.this.callerId);
            }
        });
    }

    @Override
    public void installConsumer() throws IOException {
        this.operate(new ChannelOperation(){

            @Override
            public void execute(Channel channel) throws IOException {
                channel.basicConsume(RequestMessageRouter.this.getResponseQueueName() + "-" + RequestMessageRouter.this.callerId, true, RequestMessageRouter.this.getResponseQueueName() + "-" + RequestMessageRouter.this.callerId + "[" + RequestMessageRouter.this.index + "]", false, false, null, (Consumer)new DefaultConsumer(channel){

                    public synchronized void handleDelivery(String consumerTag, Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) {
                        try {
                            long timeInTopic = System.currentTimeMillis() - RequestMessageRouter.this.getTimestamp(properties);
                            LoggerManager.getLogger(ResponseMessageRouter.class).debug("response message received(%s) in %d ms...", new Object[]{properties.getMessageId(), timeInTopic});
                            InstrumentationManager.instrumentWithChronometer((MetricConfigurationProvider)RequestMessageRouter.this.requestTransport, (long)(timeInTopic >= 0L ? timeInTopic : 0L), (TimeUnit)TimeUnit.MILLISECONDS, (MetricProperty[])new MetricProperty[]{new MetricProperty("queue", MetricType.RESPONSE_TOPIC_TRANSIT.getDisplay())});
                            InstrumentationManager.execute((Instrument)new ChronometerInstrument(RequestMessageRouter.this.requestTransport, new MetricProperty[]{new MetricProperty("event", MetricType.COMPLETE_CALLBACK.getDisplay())}){

                                public void withChronometer() throws Exception {
                                    RequestMessageRouter.this.requestTransport.completeCallback(properties.getCorrelationId(), RequestMessageRouter.this.signalCodec.decode(body, 0, body.length, ResultSignal.class));
                                }
                            });
                        }
                        catch (Exception exception) {
                            LoggerManager.getLogger(ResponseMessageRouter.class).error((Throwable)exception);
                        }
                    }
                });
            }
        });
    }

    public String publish(boolean inOnly, String serviceGroup, Voice voice, Address address, Map<String, Object> arguments, WireContext ... contexts) throws Exception {
        RabbitMQMessage rabbitMQMessage = this.constructMessage(inOnly, address, arguments, contexts);
        StringBuilder routingKeyBuilder = new StringBuilder(voice.getMode().getName()).append("-").append(serviceGroup);
        if (voice.getMode().equals((Object)VocalMode.WHISPER)) {
            routingKeyBuilder.append('[').append(voice.getInstanceId()).append(']');
        }
        this.send(routingKeyBuilder.toString(), this.getRequestExchangeName(), rabbitMQMessage.getProperties(), rabbitMQMessage.getBody());
        return rabbitMQMessage.getProperties().getMessageId();
    }

    private RabbitMQMessage constructMessage(final boolean inOnly, final Address address, final Map<String, Object> arguments, final WireContext ... contexts) throws Exception {
        return (RabbitMQMessage)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<RabbitMQMessage>((MetricConfigurationProvider)this.requestTransport, new MetricProperty[]{new MetricProperty("event", MetricType.CONSTRUCT_MESSAGE.getDisplay())}){

            public RabbitMQMessage withChronometer() throws Exception {
                HashMap<String, String> headerMap = new HashMap<String, String>();
                if (!inOnly) {
                    headerMap.put(CALLER_ID_AMQP_KEY, RequestMessageRouter.this.callerId);
                }
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().contentType(RequestMessageRouter.this.signalCodec.getContentType()).messageId(SnowflakeId.newInstance().generateDottedString()).timestamp(new Date()).expiration(String.valueOf(RequestMessageRouter.this.ttlSeconds * 1000)).headers(headerMap).build();
                return new RabbitMQMessage(properties, RequestMessageRouter.this.signalCodec.encode(new InvocationSignal(inOnly, address, arguments, contexts)));
            }
        });
    }
}

