/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.amqp.rabbitmq;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.smallmind.instrument.ChronometerInstrumentAndReturn;
import org.smallmind.instrument.InstrumentAndReturn;
import org.smallmind.instrument.InstrumentationManager;
import org.smallmind.instrument.MetricProperty;
import org.smallmind.instrument.config.MetricConfiguration;
import org.smallmind.instrument.config.MetricConfigurationProvider;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.phalanx.wire.AbstractRequestTransport;
import org.smallmind.phalanx.wire.Address;
import org.smallmind.phalanx.wire.ConversationType;
import org.smallmind.phalanx.wire.MetricInteraction;
import org.smallmind.phalanx.wire.SignalCodec;
import org.smallmind.phalanx.wire.TransportException;
import org.smallmind.phalanx.wire.Voice;
import org.smallmind.phalanx.wire.WireContext;
import org.smallmind.phalanx.wire.amqp.rabbitmq.NameConfiguration;
import org.smallmind.phalanx.wire.amqp.rabbitmq.RabbitMQConnector;
import org.smallmind.phalanx.wire.amqp.rabbitmq.RequestMessageRouter;

public class RabbitMQRequestTransport
extends AbstractRequestTransport
implements MetricConfigurationProvider {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final MetricConfiguration metricConfiguration;
    private final SignalCodec signalCodec;
    private final LinkedBlockingQueue<RequestMessageRouter> routerQueue;
    private final RequestMessageRouter[] requestMessageRouters;
    private final String callerId = SnowflakeId.newInstance().generateDottedString();

    public RabbitMQRequestTransport(MetricConfiguration metricConfiguration, RabbitMQConnector rabbitMQConnector, NameConfiguration nameConfiguration, SignalCodec signalCodec, int clusterSize, int concurrencyLimit, int defaultTimeoutSeconds, int messageTTLSeconds) throws IOException, InterruptedException, TimeoutException {
        super(defaultTimeoutSeconds);
        int index;
        int routerIndex = 0;
        this.metricConfiguration = metricConfiguration;
        this.signalCodec = signalCodec;
        this.requestMessageRouters = new RequestMessageRouter[clusterSize];
        for (index = 0; index < this.requestMessageRouters.length; ++index) {
            this.requestMessageRouters[index] = new RequestMessageRouter(rabbitMQConnector, nameConfiguration, this, signalCodec, this.callerId, index, messageTTLSeconds);
            this.requestMessageRouters[index].initialize();
        }
        this.routerQueue = new LinkedBlockingQueue();
        for (index = 0; index < Math.max(clusterSize, concurrencyLimit); ++index) {
            this.routerQueue.add(this.requestMessageRouters[routerIndex]);
            if (++routerIndex != this.requestMessageRouters.length) continue;
            routerIndex = 0;
        }
    }

    public MetricConfiguration getMetricConfiguration() {
        return this.metricConfiguration;
    }

    @Override
    public String getCallerId() {
        return this.callerId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object transmit(Voice voice, Address address, Map<String, Object> arguments, WireContext ... contexts) throws Throwable {
        RequestMessageRouter requestMessageRouter = this.acquireRequestMessageRouter();
        try {
            boolean inOnly = voice.getConversation().getConversationType().equals((Object)ConversationType.IN_ONLY);
            String messageId = requestMessageRouter.publish(inOnly, (String)voice.getServiceGroup(), voice, address, arguments, contexts);
            Object object = this.acquireResult(this.signalCodec, address, voice, messageId, inOnly);
            return object;
        }
        finally {
            this.routerQueue.put(requestMessageRouter);
        }
    }

    private RequestMessageRouter acquireRequestMessageRouter() throws Throwable {
        return (RequestMessageRouter)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<RequestMessageRouter>((MetricConfigurationProvider)this, new MetricProperty[]{new MetricProperty("event", MetricInteraction.ACQUIRE_REQUEST_DESTINATION.getDisplay())}){

            public RequestMessageRouter withChronometer() throws TransportException, InterruptedException {
                RequestMessageRouter messageTransmitter;
                do {
                    messageTransmitter = (RequestMessageRouter)RabbitMQRequestTransport.this.routerQueue.poll(1L, TimeUnit.SECONDS);
                } while (!RabbitMQRequestTransport.this.closed.get() && messageTransmitter == null);
                if (messageTransmitter == null) {
                    throw new TransportException("Message transmission has been closed", new Object[0]);
                }
                return messageTransmitter;
            }
        });
    }

    @Override
    public void close() throws IOException, InterruptedException, TimeoutException {
        if (this.closed.compareAndSet(false, true)) {
            for (RequestMessageRouter requestMessageRouter : this.requestMessageRouters) {
                requestMessageRouter.close();
            }
            this.getCallbackMap().shutdown();
        }
    }
}

