/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.transport.amqp.rabbitmq;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.phalanx.wire.TransportException;
import org.smallmind.phalanx.wire.signal.SignalCodec;
import org.smallmind.phalanx.wire.transport.ResponseTransmitter;
import org.smallmind.phalanx.wire.transport.ResponseTransport;
import org.smallmind.phalanx.wire.transport.TransportState;
import org.smallmind.phalanx.wire.transport.WireInvocationCircuit;
import org.smallmind.phalanx.wire.transport.WiredService;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.InvocationWorker;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.NameConfiguration;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.PublisherConfirmationHandler;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.RabbitMQConnector;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.RabbitMQMessage;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.ResponseMessageRouter;
import org.smallmind.phalanx.worker.WorkManager;
import org.smallmind.phalanx.worker.WorkQueue;
import org.smallmind.phalanx.worker.WorkerFactory;

public class RabbitMQResponseTransport
extends WorkManager<InvocationWorker, RabbitMQMessage>
implements WorkerFactory<InvocationWorker, RabbitMQMessage>,
ResponseTransport,
ResponseTransmitter {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicReference<TransportState> transportStateRef = new AtomicReference<TransportState>(TransportState.PLAYING);
    private final WireInvocationCircuit invocationCircuit = new WireInvocationCircuit();
    private final SignalCodec signalCodec;
    private final ConcurrentLinkedQueue<ResponseMessageRouter> responseQueue;
    private final ResponseMessageRouter[] responseMessageRouters;
    private final String instanceId = SnowflakeId.newInstance().generateDottedString();

    public RabbitMQResponseTransport(RabbitMQConnector rabbitMQConnector, NameConfiguration nameConfiguration, Class<InvocationWorker> workerClass, SignalCodec signalCodec, String serviceGroup, int clusterSize, int concurrencyLimit, int messageTTLSeconds, boolean autoAcknowledge, PublisherConfirmationHandler publisherConfirmationHandler) throws IOException, InterruptedException, TimeoutException {
        super(workerClass, concurrencyLimit);
        int index;
        int routerIndex = 0;
        this.signalCodec = signalCodec;
        this.responseMessageRouters = new ResponseMessageRouter[clusterSize];
        for (index = 0; index < this.responseMessageRouters.length; ++index) {
            this.responseMessageRouters[index] = new ResponseMessageRouter(rabbitMQConnector, nameConfiguration, this, signalCodec, serviceGroup, this.instanceId, index, messageTTLSeconds, autoAcknowledge, publisherConfirmationHandler);
            this.responseMessageRouters[index].initialize();
        }
        this.responseQueue = new ConcurrentLinkedQueue();
        for (index = 0; index < Math.max(clusterSize, concurrencyLimit); ++index) {
            this.responseQueue.add(this.responseMessageRouters[routerIndex]);
            if (++routerIndex != this.responseMessageRouters.length) continue;
            routerIndex = 0;
        }
        this.startUp(this);
    }

    @Override
    public String getInstanceId() {
        return this.instanceId;
    }

    @Override
    public String register(Class<?> serviceInterface, WiredService targetService) throws Exception {
        this.invocationCircuit.register(serviceInterface, targetService);
        return this.instanceId;
    }

    @Override
    public InvocationWorker createWorker(WorkQueue<RabbitMQMessage> transferQueue) {
        return new InvocationWorker(transferQueue, this, this.invocationCircuit, this.signalCodec);
    }

    @Override
    public TransportState getState() {
        return this.transportStateRef.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void play() throws Exception {
        AtomicReference<TransportState> atomicReference = this.transportStateRef;
        synchronized (atomicReference) {
            if (this.transportStateRef.compareAndSet(TransportState.PAUSED, TransportState.PLAYING)) {
                for (ResponseMessageRouter responseMessageRouter : this.responseMessageRouters) {
                    responseMessageRouter.play();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void pause() throws Exception {
        AtomicReference<TransportState> atomicReference = this.transportStateRef;
        synchronized (atomicReference) {
            if (this.transportStateRef.compareAndSet(TransportState.PLAYING, TransportState.PAUSED)) {
                for (ResponseMessageRouter responseMessageRouter : this.responseMessageRouters) {
                    responseMessageRouter.pause();
                }
            }
        }
    }

    @Override
    public void transmit(String callerId, String correlationId, boolean error, String nativeType, Object result) throws Throwable {
        ResponseMessageRouter responseMessageRouter = this.responseQueue.poll();
        if (responseMessageRouter == null) {
            throw new TransportException("Unable to take a ResponseMessageRouter, which should never happen - please contact your system administrator", new Object[0]);
        }
        responseMessageRouter.publish(callerId, correlationId, error, nativeType, result);
        this.responseQueue.add(responseMessageRouter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException, InterruptedException, TimeoutException {
        if (this.closed.compareAndSet(false, true)) {
            AtomicReference<TransportState> atomicReference = this.transportStateRef;
            synchronized (atomicReference) {
                this.transportStateRef.set(TransportState.CLOSED);
                for (ResponseMessageRouter responseMessageRouter : this.responseMessageRouters) {
                    responseMessageRouter.close();
                }
                this.shutDown();
            }
        }
    }
}

