/*
 * Decompiled with CFR 0.152.
 */
package org.smallmind.phalanx.wire.transport.amqp.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.ChannelOperation;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.NameConfiguration;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.PublisherConfirmationHandler;
import org.smallmind.phalanx.wire.transport.amqp.rabbitmq.RabbitMQConnector;
import org.smallmind.scribe.pen.LoggerManager;

public abstract class MessageRouter {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicStampedReference<ConnectionAndChannel> connectionAndChannelRef = new AtomicStampedReference<Object>(null, 0);
    private final AtomicInteger version = new AtomicInteger(0);
    private final RabbitMQConnector connector;
    private final NameConfiguration nameConfiguration;
    private final PublisherConfirmationHandler publisherConfirmationHandler;
    private final String prefix;

    public MessageRouter(RabbitMQConnector connector, String prefix, NameConfiguration nameConfiguration, PublisherConfirmationHandler publisherConfirmationHandler) {
        this.connector = connector;
        this.prefix = prefix;
        this.nameConfiguration = nameConfiguration;
        this.publisherConfirmationHandler = publisherConfirmationHandler;
    }

    public abstract void bindQueues() throws IOException;

    public abstract void installConsumer() throws IOException;

    public void initialize() throws IOException, TimeoutException {
        this.ensureChannel(0);
    }

    public String getRequestExchangeName() {
        return this.prefix + "-" + this.nameConfiguration.getRequestExchange();
    }

    public String getResponseExchangeName() {
        return this.prefix + "-" + this.nameConfiguration.getResponseExchange();
    }

    public String getResponseQueueName() {
        return this.prefix + "-" + this.nameConfiguration.getResponseQueue();
    }

    public String getShoutQueueName() {
        return this.prefix + "-" + this.nameConfiguration.getShoutQueue();
    }

    public String getTalkQueueName() {
        return this.prefix + "-" + this.nameConfiguration.getTalkQueue();
    }

    public String getWhisperQueueName() {
        return this.prefix + "-" + this.nameConfiguration.getWhisperQueue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensureChannel(int stamp) throws IOException, TimeoutException {
        AtomicStampedReference<ConnectionAndChannel> atomicStampedReference = this.connectionAndChannelRef;
        synchronized (atomicStampedReference) {
            if (!this.closed.get() && this.connectionAndChannelRef.getStamp() == stamp) {
                Connection connection;
                Channel channel;
                ConnectionAndChannel previousConnectionAndChannel = this.connectionAndChannelRef.getReference();
                if (previousConnectionAndChannel != null && previousConnectionAndChannel.getConnection().isOpen()) {
                    try {
                        previousConnectionAndChannel.close();
                    }
                    catch (IOException ioException) {
                        LoggerManager.getLogger(MessageRouter.class).error((Throwable)ioException);
                    }
                }
                if ((channel = (connection = this.connector.getConnection()).createChannel()) == null) {
                    throw new IOException("No channel is available");
                }
                if (this.publisherConfirmationHandler != null) {
                    channel.confirmSelect();
                    channel.addConfirmListener(this.publisherConfirmationHandler.generateConfirmListener());
                }
                channel.basicQos(0, 1, false);
                channel.exchangeDeclare(this.getRequestExchangeName(), "direct", false, false, null);
                channel.exchangeDeclare(this.getResponseExchangeName(), "direct", false, false, null);
                int nextStamp = this.version.incrementAndGet();
                this.connectionAndChannelRef.set(new ConnectionAndChannel(connection, channel), nextStamp);
                channel.addShutdownListener(cause -> {
                    try {
                        if (!this.closed.get()) {
                            this.ensureChannel(nextStamp);
                        }
                    }
                    catch (IOException | TimeoutException exception) {
                        LoggerManager.getLogger(MessageRouter.class).error((Throwable)exception);
                    }
                });
                this.bindQueues();
                this.installConsumer();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void operate(ChannelOperation channelOperation) throws IOException {
        AtomicStampedReference<ConnectionAndChannel> atomicStampedReference = this.connectionAndChannelRef;
        synchronized (atomicStampedReference) {
            channelOperation.execute(this.connectionAndChannelRef.getReference().getChannel());
        }
    }

    public void send(String routingKey, String exchangeName, AMQP.BasicProperties properties, byte[] body) throws IOException, TimeoutException {
        if (!this.closed.get()) {
            boolean sent = false;
            do {
                int[] stampHolder = new int[1];
                Channel channel = this.connectionAndChannelRef.get(stampHolder).getChannel();
                try {
                    channel.basicPublish(exchangeName, routingKey, true, false, properties, body);
                    sent = true;
                }
                catch (AlreadyClosedException exception) {
                    this.ensureChannel(stampHolder[0]);
                }
            } while (!sent && !this.closed.get());
        }
    }

    public long getTimestamp(AMQP.BasicProperties properties) {
        Date date = properties.getTimestamp();
        if (date != null) {
            return date.getTime();
        }
        return Long.MAX_VALUE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws IOException, TimeoutException {
        if (this.closed.compareAndSet(false, true)) {
            AtomicStampedReference<ConnectionAndChannel> atomicStampedReference = this.connectionAndChannelRef;
            synchronized (atomicStampedReference) {
                ConnectionAndChannel connectionAndChannel = this.connectionAndChannelRef.getReference();
                if (connectionAndChannel != null) {
                    connectionAndChannel.close();
                }
            }
        }
    }

    private static class ConnectionAndChannel {
        private final Connection connection;
        private final Channel channel;

        public ConnectionAndChannel(Connection connection, Channel channel) {
            this.connection = connection;
            this.channel = channel;
        }

        public Connection getConnection() {
            return this.connection;
        }

        public Channel getChannel() {
            return this.channel;
        }

        public void close() throws IOException {
            this.connection.close();
        }
    }
}

