/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.amqp.rabbitmq;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.smallmind.instrument.ChronometerInstrumentAndReturn;
import org.smallmind.instrument.InstrumentAndReturn;
import org.smallmind.instrument.InstrumentationManager;
import org.smallmind.instrument.MetricProperty;
import org.smallmind.instrument.config.MetricConfiguration;
import org.smallmind.instrument.config.MetricConfigurationProvider;
import org.smallmind.nutsnbolts.time.Duration;
import org.smallmind.nutsnbolts.util.SelfDestructive;
import org.smallmind.nutsnbolts.util.SelfDestructiveMap;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.phalanx.wire.Address;
import org.smallmind.phalanx.wire.AsynchronousTransmissionCallback;
import org.smallmind.phalanx.wire.MetricType;
import org.smallmind.phalanx.wire.RequestTransport;
import org.smallmind.phalanx.wire.ResultSignal;
import org.smallmind.phalanx.wire.SignalCodec;
import org.smallmind.phalanx.wire.SynchronousTransmissionCallback;
import org.smallmind.phalanx.wire.TransmissionCallback;
import org.smallmind.phalanx.wire.TransportException;
import org.smallmind.phalanx.wire.Voice;
import org.smallmind.phalanx.wire.WireContext;
import org.smallmind.phalanx.wire.amqp.rabbitmq.NameConfiguration;
import org.smallmind.phalanx.wire.amqp.rabbitmq.RabbitMQConnector;
import org.smallmind.phalanx.wire.amqp.rabbitmq.RequestMessageRouter;

public class RabbitMQRequestTransport
implements MetricConfigurationProvider,
RequestTransport {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final MetricConfiguration metricConfiguration;
    private final SignalCodec signalCodec;
    private final SelfDestructiveMap<String, TransmissionCallback> callbackMap;
    private final LinkedBlockingQueue<RequestMessageRouter> routerQueue;
    private final RequestMessageRouter[] requestMessageRouters;
    private final String callerId = SnowflakeId.newInstance().generateDottedString();

    public RabbitMQRequestTransport(MetricConfiguration metricConfiguration, RabbitMQConnector rabbitMQConnector, NameConfiguration nameConfiguration, SignalCodec signalCodec, int clusterSize, int concurrencyLimit, int defaultTimeoutSeconds, int messageTTLSeconds) throws IOException, InterruptedException {
        int index;
        int routerIndex = 0;
        this.metricConfiguration = metricConfiguration;
        this.signalCodec = signalCodec;
        this.callbackMap = new SelfDestructiveMap(new Duration((long)defaultTimeoutSeconds, TimeUnit.SECONDS));
        this.requestMessageRouters = new RequestMessageRouter[clusterSize];
        for (index = 0; index < this.requestMessageRouters.length; ++index) {
            this.requestMessageRouters[index] = new RequestMessageRouter(rabbitMQConnector, nameConfiguration, this, signalCodec, this.callerId, index, messageTTLSeconds);
            this.requestMessageRouters[index].initialize();
        }
        this.routerQueue = new LinkedBlockingQueue();
        for (index = 0; index < Math.max(clusterSize, concurrencyLimit); ++index) {
            this.routerQueue.add(this.requestMessageRouters[routerIndex]);
            if (++routerIndex != this.requestMessageRouters.length) continue;
            routerIndex = 0;
        }
    }

    public MetricConfiguration getMetricConfiguration() {
        return this.metricConfiguration;
    }

    @Override
    public String getCallerId() {
        return this.callerId;
    }

    @Override
    public void transmitInOnly(String serviceGroup, Voice voice, Address address, Map<String, Object> arguments, WireContext ... contexts) throws Exception {
        this.transmit(true, serviceGroup, voice, 0, address, arguments, contexts);
    }

    @Override
    public Object transmitInOut(String serviceGroup, Voice voice, int timeoutSeconds, Address address, Map<String, Object> arguments, WireContext ... contexts) throws Throwable {
        TransmissionCallback transmissionCallback = this.transmit(false, serviceGroup, voice, timeoutSeconds, address, arguments, contexts);
        if (transmissionCallback != null) {
            return transmissionCallback.getResult(this.signalCodec);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TransmissionCallback transmit(boolean inOnly, String serviceGroup, Voice voice, int timeoutSeconds, Address address, Map<String, Object> arguments, WireContext ... contexts) throws Exception {
        RequestMessageRouter requestMessageRouter = this.acquireRequestMessageRouter();
        try {
            String messageId = requestMessageRouter.publish(inOnly, serviceGroup, voice, address, arguments, contexts);
            if (!inOnly) {
                AsynchronousTransmissionCallback asynchronousCallback = new AsynchronousTransmissionCallback(address.getService(), address.getFunction().getName());
                SynchronousTransmissionCallback previousCallback = (SynchronousTransmissionCallback)this.callbackMap.putIfAbsent((Comparable)((Object)messageId), (SelfDestructive)asynchronousCallback, timeoutSeconds > 0 ? new Duration((long)timeoutSeconds, TimeUnit.SECONDS) : null);
                if (previousCallback != null) {
                    SynchronousTransmissionCallback synchronousTransmissionCallback = previousCallback;
                    return synchronousTransmissionCallback;
                }
                AsynchronousTransmissionCallback asynchronousTransmissionCallback = asynchronousCallback;
                return asynchronousTransmissionCallback;
            }
            TransmissionCallback transmissionCallback = null;
            return transmissionCallback;
        }
        finally {
            this.routerQueue.put(requestMessageRouter);
        }
    }

    private RequestMessageRouter acquireRequestMessageRouter() throws Exception {
        return (RequestMessageRouter)InstrumentationManager.execute((InstrumentAndReturn)new ChronometerInstrumentAndReturn<RequestMessageRouter>((MetricConfigurationProvider)this, new MetricProperty[]{new MetricProperty("event", MetricType.ACQUIRE_REQUEST_DESTINATION.getDisplay())}){

            public RequestMessageRouter withChronometer() throws TransportException, InterruptedException {
                RequestMessageRouter messageTransmitter;
                do {
                    messageTransmitter = (RequestMessageRouter)RabbitMQRequestTransport.this.routerQueue.poll(1L, TimeUnit.SECONDS);
                } while (!RabbitMQRequestTransport.this.closed.get() && messageTransmitter == null);
                if (messageTransmitter == null) {
                    throw new TransportException("Message transmission has been closed", new Object[0]);
                }
                return messageTransmitter;
            }
        });
    }

    public void completeCallback(String correlationId, ResultSignal resultSignal) {
        TransmissionCallback previousCallback = (TransmissionCallback)this.callbackMap.get((Comparable)((Object)correlationId));
        if (previousCallback == null) {
            previousCallback = (TransmissionCallback)this.callbackMap.putIfAbsent((Comparable)((Object)correlationId), (SelfDestructive)new SynchronousTransmissionCallback(resultSignal));
            if (previousCallback != null && previousCallback instanceof AsynchronousTransmissionCallback) {
                ((AsynchronousTransmissionCallback)previousCallback).setResultSignal(resultSignal);
            }
        } else if (previousCallback instanceof AsynchronousTransmissionCallback) {
            ((AsynchronousTransmissionCallback)previousCallback).setResultSignal(resultSignal);
        }
    }

    @Override
    public void close() throws IOException, InterruptedException {
        if (this.closed.compareAndSet(false, true)) {
            for (RequestMessageRouter requestMessageRouter : this.requestMessageRouters) {
                requestMessageRouter.close();
            }
            this.callbackMap.shutdown();
        }
    }
}

