/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.transport.amqp.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.smallmind.claxon.registry.Instrument;
import org.smallmind.claxon.registry.Tag;
import org.smallmind.claxon.registry.meter.MeterBuilder;
import org.smallmind.claxon.registry.meter.MeterFactory;
import org.smallmind.claxon.registry.meter.SpeedometerBuilder;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.phalanx.wire.VocalMode;
import org.smallmind.phalanx.wire.Voice;
import org.smallmind.phalanx.wire.signal.InvocationSignal;
import org.smallmind.phalanx.wire.signal.ResultSignal;
import org.smallmind.phalanx.wire.signal.Route;
import org.smallmind.phalanx.wire.signal.SignalCodec;
import org.smallmind.phalanx.wire.signal.WireContext;
import org.smallmind.phalanx.wire.transport.ClaxonTag;
import org.smallmind.phalanx.wire.transport.WireProperty;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.MessageRouter;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.NameConfiguration;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.PublisherConfirmationHandler;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.QueueContractor;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.RabbitMQConnector;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.RabbitMQMessage;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.RabbitMQRequestTransport;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.ResponseMessageRouter;
import org.smallmind.scribe.pen.LoggerManager;

public class RequestMessageRouter
extends MessageRouter {
    private static final String CALLER_ID_AMQP_KEY = "x-opt-" + WireProperty.CALLER_ID.getKey();
    private final QueueContractor ephemeralQueueContractor;
    private final RabbitMQRequestTransport requestTransport;
    private final SignalCodec signalCodec;
    private final String callerId;
    private final boolean autoAcknowledge;
    private final int index;
    private final int ttlSeconds;

    public RequestMessageRouter(RabbitMQConnector connector, QueueContractor ephemeralQueueContractor, NameConfiguration nameConfiguration, RabbitMQRequestTransport requestTransport, SignalCodec signalCodec, String callerId, int index, int ttlSeconds, boolean autoAcknowledge, PublisherConfirmationHandler publisherConfirmationHandler) {
        super(connector, "wire", nameConfiguration, publisherConfirmationHandler);
        this.ephemeralQueueContractor = ephemeralQueueContractor;
        this.requestTransport = requestTransport;
        this.signalCodec = signalCodec;
        this.callerId = callerId;
        this.index = index;
        this.ttlSeconds = ttlSeconds;
        this.autoAcknowledge = autoAcknowledge;
    }

    @Override
    public final void bindQueues() throws IOException {
        this.operate(channel -> {
            String queueName = this.getResponseQueueName() + "-" + this.callerId;
            this.ephemeralQueueContractor.declare(channel, queueName, true);
            channel.queueBind(queueName, this.getResponseExchangeName(), "response-" + this.callerId);
        });
    }

    @Override
    public void installConsumer() throws IOException {
        this.operate(channel -> channel.basicConsume(this.getResponseQueueName() + "-" + this.callerId, this.autoAcknowledge, this.getResponseQueueName() + "-" + this.callerId + "[" + this.index + "]", false, false, null, (Consumer)new DefaultConsumer(channel){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public synchronized void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                try {
                    long timeInTopic = System.currentTimeMillis() - RequestMessageRouter.this.getTimestamp(properties);
                    LoggerManager.getLogger(ResponseMessageRouter.class).debug("response message received(%s) in %d ms...", new Object[]{properties.getMessageId(), timeInTopic});
                    Instrument.with(RequestMessageRouter.class, (MeterBuilder)MeterFactory.instance(SpeedometerBuilder::new), (Tag[])new Tag[]{new Tag("queue", ClaxonTag.RESPONSE_TRANSIT_TIME.getDisplay())}).update(timeInTopic >= 0L ? timeInTopic : 0L, TimeUnit.MILLISECONDS);
                    Instrument.with(RequestMessageRouter.class, (MeterBuilder)MeterFactory.instance(SpeedometerBuilder::new), (Tag[])new Tag[]{new Tag("event", ClaxonTag.COMPLETE_CALLBACK.getDisplay())}).on(() -> RequestMessageRouter.this.requestTransport.completeCallback(properties.getCorrelationId(), RequestMessageRouter.this.signalCodec.decode(body, 0, body.length, ResultSignal.class)));
                }
                catch (Throwable throwable) {
                    LoggerManager.getLogger(ResponseMessageRouter.class).error(throwable);
                }
                finally {
                    if (!RequestMessageRouter.this.autoAcknowledge) {
                        try {
                            channel.basicAck(envelope.getDeliveryTag(), true);
                        }
                        catch (IOException ioException) {
                            LoggerManager.getLogger(ResponseMessageRouter.class).error((Throwable)ioException);
                        }
                    }
                }
            }
        }));
    }

    public String publish(boolean inOnly, String serviceGroup, Voice<?, ?> voice, Route route, Map<String, Object> arguments, WireContext ... contexts) throws Throwable {
        RabbitMQMessage rabbitMQMessage = this.constructMessage(inOnly, route, arguments, contexts);
        StringBuilder routingKeyBuilder = new StringBuilder(voice.getMode().getName()).append("-").append(serviceGroup);
        if (voice.getMode().equals((Object)VocalMode.WHISPER)) {
            routingKeyBuilder.append('[').append(voice.getInstanceId()).append(']');
        }
        this.send(routingKeyBuilder.toString(), this.getRequestExchangeName(), rabbitMQMessage.getProperties(), rabbitMQMessage.getBody());
        return rabbitMQMessage.getProperties().getMessageId();
    }

    private RabbitMQMessage constructMessage(boolean inOnly, Route route, Map<String, Object> arguments, WireContext ... contexts) throws Throwable {
        return (RabbitMQMessage)Instrument.with(RequestMessageRouter.class, (MeterBuilder)MeterFactory.instance(SpeedometerBuilder::new), (Tag[])new Tag[]{new Tag("event", ClaxonTag.CONSTRUCT_MESSAGE.getDisplay())}).on(() -> {
            HashMap<String, String> headerMap = new HashMap<String, String>();
            if (!inOnly) {
                headerMap.put(CALLER_ID_AMQP_KEY, this.callerId);
            }
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().contentType(this.signalCodec.getContentType()).messageId(SnowflakeId.newInstance().generateDottedString()).timestamp(new Date()).expiration(String.valueOf(this.ttlSeconds * 1000)).headers(headerMap).build();
            return new RabbitMQMessage(properties, this.signalCodec.encode(new InvocationSignal(inOnly, route, arguments, contexts)));
        });
    }
}

