/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.transport.amqp.rabbitmq;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.smallmind.claxon.registry.Instrument;
import org.smallmind.claxon.registry.Tag;
import org.smallmind.claxon.registry.meter.MeterBuilder;
import org.smallmind.claxon.registry.meter.MeterFactory;
import org.smallmind.claxon.registry.meter.SpeedometerBuilder;
import org.smallmind.nutsnbolts.util.SnowflakeId;
import org.smallmind.phalanx.wire.ConversationType;
import org.smallmind.phalanx.wire.TransportException;
import org.smallmind.phalanx.wire.Voice;
import org.smallmind.phalanx.wire.signal.Route;
import org.smallmind.phalanx.wire.signal.SignalCodec;
import org.smallmind.phalanx.wire.signal.WireContext;
import org.smallmind.phalanx.wire.transport.AbstractRequestTransport;
import org.smallmind.phalanx.wire.transport.ClaxonTag;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.NameConfiguration;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.PublisherConfirmationHandler;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.RabbitMQConnector;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.RequestMessageRouter;

public class RabbitMQRequestTransport
extends AbstractRequestTransport {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final SignalCodec signalCodec;
    private final LinkedBlockingQueue<RequestMessageRouter> routerQueue;
    private final RequestMessageRouter[] requestMessageRouters;
    private final String callerId = SnowflakeId.newInstance().generateDottedString();

    public RabbitMQRequestTransport(RabbitMQConnector rabbitMQConnector, NameConfiguration nameConfiguration, SignalCodec signalCodec, int clusterSize, int concurrencyLimit, long defaultTimeoutSeconds, int messageTTLSeconds, boolean autoAcknowledge, PublisherConfirmationHandler publisherConfirmationHandler) throws IOException, TimeoutException {
        super(defaultTimeoutSeconds);
        int index;
        int routerIndex = 0;
        this.signalCodec = signalCodec;
        this.requestMessageRouters = new RequestMessageRouter[clusterSize];
        for (index = 0; index < this.requestMessageRouters.length; ++index) {
            this.requestMessageRouters[index] = new RequestMessageRouter(rabbitMQConnector, nameConfiguration, this, signalCodec, this.callerId, index, messageTTLSeconds, autoAcknowledge, publisherConfirmationHandler);
            this.requestMessageRouters[index].initialize();
        }
        this.routerQueue = new LinkedBlockingQueue();
        for (index = 0; index < Math.max(clusterSize, concurrencyLimit); ++index) {
            this.routerQueue.add(this.requestMessageRouters[routerIndex]);
            if (++routerIndex != this.requestMessageRouters.length) continue;
            routerIndex = 0;
        }
    }

    @Override
    public String getCallerId() {
        return this.callerId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object transmit(Voice<?, ?> voice, Route route, Map<String, Object> arguments, WireContext ... contexts) throws Throwable {
        RequestMessageRouter requestMessageRouter = this.acquireRequestMessageRouter();
        try {
            boolean inOnly = voice.getConversation().getConversationType().equals((Object)ConversationType.IN_ONLY);
            String messageId = requestMessageRouter.publish(inOnly, (String)voice.getServiceGroup(), voice, route, arguments, contexts);
            Object object = Instrument.with(RabbitMQRequestTransport.class, (MeterBuilder)MeterFactory.instance(SpeedometerBuilder::new), (Tag[])new Tag[]{new Tag("event", ClaxonTag.ACQUIRE_RESULT.getDisplay())}).on(() -> this.acquireResult(this.signalCodec, route, voice, messageId, inOnly));
            return object;
        }
        finally {
            this.routerQueue.put(requestMessageRouter);
        }
    }

    private RequestMessageRouter acquireRequestMessageRouter() throws Throwable {
        return (RequestMessageRouter)Instrument.with(RabbitMQRequestTransport.class, (MeterBuilder)MeterFactory.instance(SpeedometerBuilder::new), (Tag[])new Tag[]{new Tag("event", ClaxonTag.ACQUIRE_REQUEST_TRANSPORT.getDisplay())}).on(() -> {
            RequestMessageRouter messageTransmitter;
            do {
                messageTransmitter = this.routerQueue.poll(1L, TimeUnit.SECONDS);
            } while (!this.closed.get() && messageTransmitter == null);
            if (messageTransmitter == null) {
                throw new TransportException("Message transmission has been closed", new Object[0]);
            }
            return messageTransmitter;
        });
    }

    @Override
    public void close() throws Exception {
        if (this.closed.compareAndSet(false, true)) {
            for (RequestMessageRouter requestMessageRouter : this.requestMessageRouters) {
                requestMessageRouter.close();
            }
        }
    }
}

