/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.contribs.queue.amqp;

import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern;
import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQPConnection {
    private static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection.class);
    private volatile Connection publisherConnection = null;
    private volatile Connection subscriberConnection = null;
    private ConnectionFactory factory = null;
    private Address[] addresses = null;
    private static AMQPConnection amqpConnection = null;
    private static final String PUBLISHER = "Publisher";
    private static final String SUBSCRIBER = "Subscriber";
    private static final Map<ConnectionType, Set<Channel>> availableChannelPool = new ConcurrentHashMap<ConnectionType, Set<Channel>>();
    private static final Map<String, Channel> subscriberReservedChannelPool = new ConcurrentHashMap<String, Channel>();
    private static AMQPRetryPattern retrySettings = null;

    private AMQPConnection() {
    }

    private AMQPConnection(ConnectionFactory factory, Address[] address) {
        this.factory = factory;
        this.addresses = address;
    }

    public static synchronized AMQPConnection getInstance(ConnectionFactory factory, Address[] address, AMQPRetryPattern retrySettings) {
        if (amqpConnection == null) {
            amqpConnection = new AMQPConnection(factory, address);
        }
        AMQPConnection.retrySettings = retrySettings;
        return amqpConnection;
    }

    public static void setAMQPConnection(AMQPConnection amqpConnection) {
        AMQPConnection.amqpConnection = amqpConnection;
    }

    public Address[] getAddresses() {
        return this.addresses;
    }

    private Connection createConnection(String connectionPrefix) {
        int retryIndex = 1;
        while (true) {
            AMQPRetryPattern retry;
            try {
                final Connection connection = this.factory.newConnection(this.addresses, System.getenv("HOSTNAME") + "-" + connectionPrefix);
                if (connection == null || !connection.isOpen()) {
                    throw new RuntimeException("Failed to open connection");
                }
                connection.addShutdownListener(new ShutdownListener(){

                    public void shutdownCompleted(ShutdownSignalException cause) {
                        LOGGER.error("Received a shutdown exception for the connection {}. reason {} cause{}", new Object[]{connection.getClientProvidedName(), cause.getMessage(), cause});
                    }
                });
                connection.addBlockedListener(new BlockedListener(){

                    public void handleUnblocked() throws IOException {
                        LOGGER.info("Connection {} is unblocked", (Object)connection.getClientProvidedName());
                    }

                    public void handleBlocked(String reason) throws IOException {
                        LOGGER.error("Connection {} is blocked. reason: {}", (Object)connection.getClientProvidedName(), (Object)reason);
                    }
                });
                return connection;
            }
            catch (IOException e) {
                retry = retrySettings;
                if (retry == null) {
                    String error = "IO error while connecting to " + Arrays.stream(this.addresses).map(address -> address.toString()).collect(Collectors.joining(","));
                    LOGGER.error(error, (Throwable)e);
                    throw new RuntimeException(error, e);
                }
                try {
                    retry.continueOrPropogate(e, retryIndex);
                }
                catch (Exception ex) {
                    String error = "Retries completed. IO error while connecting to " + Arrays.stream(this.addresses).map(address -> address.toString()).collect(Collectors.joining(","));
                    LOGGER.error(error, (Throwable)e);
                    throw new RuntimeException(error, e);
                }
                ++retryIndex;
                continue;
            }
            catch (TimeoutException e) {
                retry = retrySettings;
                if (retry == null) {
                    String error = "Timeout while connecting to " + Arrays.stream(this.addresses).map(address -> address.toString()).collect(Collectors.joining(","));
                    LOGGER.error(error, (Throwable)e);
                    throw new RuntimeException(error, e);
                }
                try {
                    retry.continueOrPropogate(e, retryIndex);
                }
                catch (Exception ex) {
                    String error = "Retries completed. Timeout while connecting to " + Arrays.stream(this.addresses).map(address -> address.toString()).collect(Collectors.joining(","));
                    LOGGER.error(error, (Throwable)e);
                    throw new RuntimeException(error, e);
                }
                ++retryIndex;
                continue;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Channel getOrCreateChannel(ConnectionType connectionType, String queueOrExchangeName) throws Exception {
        LOGGER.debug("Accessing the channel for queueOrExchange {} with type {} ", (Object)queueOrExchangeName, (Object)connectionType);
        switch (connectionType) {
            case SUBSCRIBER: {
                AMQPConnection locChn;
                String subChnName = String.valueOf((Object)connectionType) + ";" + queueOrExchangeName;
                if (subscriberReservedChannelPool.containsKey(subChnName) && (locChn = subscriberReservedChannelPool.get(subChnName)) != null && locChn.isOpen()) {
                    return locChn;
                }
                locChn = this;
                synchronized (locChn) {
                    if (this.subscriberConnection == null || !this.subscriberConnection.isOpen()) {
                        this.subscriberConnection = this.createConnection(SUBSCRIBER);
                    }
                }
                Channel subChn = this.borrowChannel(connectionType, this.subscriberConnection);
                subscriberReservedChannelPool.put(subChnName, subChn);
                return subChn;
            }
            case PUBLISHER: {
                AMQPConnection aMQPConnection = this;
                synchronized (aMQPConnection) {
                    if (this.publisherConnection == null || !this.publisherConnection.isOpen()) {
                        this.publisherConnection = this.createConnection(PUBLISHER);
                    }
                }
                return this.borrowChannel(connectionType, this.publisherConnection);
            }
        }
        return null;
    }

    private Channel getOrCreateChannel(ConnectionType connType, Connection rmqConnection) {
        Channel locChn = null;
        int retryIndex = 1;
        while (true) {
            AMQPRetryPattern retry;
            try {
                LOGGER.debug("Creating a channel for " + String.valueOf((Object)connType));
                locChn = rmqConnection.createChannel();
                if (locChn == null || !locChn.isOpen()) {
                    throw new RuntimeException("Fail to open " + String.valueOf((Object)connType) + " channel");
                }
                locChn.addShutdownListener(cause -> LOGGER.error(String.valueOf((Object)connType) + " Channel has been shutdown: {}", (Object)cause.getMessage(), (Object)cause));
                return locChn;
            }
            catch (IOException e) {
                retry = retrySettings;
                if (retry == null) {
                    throw new RuntimeException("Cannot open " + String.valueOf((Object)connType) + " channel on " + Arrays.stream(this.addresses).map(address -> address.toString()).collect(Collectors.joining(",")), e);
                }
                try {
                    retry.continueOrPropogate(e, retryIndex);
                }
                catch (Exception ex) {
                    throw new RuntimeException("Retries completed. Cannot open " + String.valueOf((Object)connType) + " channel on " + Arrays.stream(this.addresses).map(address -> address.toString()).collect(Collectors.joining(",")), e);
                }
                ++retryIndex;
                continue;
            }
            catch (Exception e) {
                retry = retrySettings;
                if (retry == null) {
                    throw new RuntimeException("Cannot open " + String.valueOf((Object)connType) + " channel on " + Arrays.stream(this.addresses).map(address -> address.toString()).collect(Collectors.joining(",")), e);
                }
                try {
                    retry.continueOrPropogate(e, retryIndex);
                }
                catch (Exception ex) {
                    throw new RuntimeException("Retries completed. Cannot open " + String.valueOf((Object)connType) + " channel on " + Arrays.stream(this.addresses).map(address -> address.toString()).collect(Collectors.joining(",")), e);
                }
                ++retryIndex;
                continue;
            }
            break;
        }
    }

    public void close() {
        LOGGER.info("Closing all connections and channels");
        try {
            this.closeChannelsInMap(ConnectionType.PUBLISHER);
            this.closeChannelsInMap(ConnectionType.SUBSCRIBER);
            this.closeConnection(this.publisherConnection);
            this.closeConnection(this.subscriberConnection);
        }
        finally {
            availableChannelPool.clear();
            this.publisherConnection = null;
            this.subscriberConnection = null;
        }
    }

    private void closeChannelsInMap(ConnectionType conType) {
        Set<Channel> channels = availableChannelPool.get((Object)conType);
        if (channels != null && !channels.isEmpty()) {
            for (Channel channel : channels) {
                this.closeChannel(channel);
            }
            channels.clear();
        }
    }

    private void closeConnection(Connection connection) {
        if (connection == null || !connection.isOpen()) {
            LOGGER.warn("Connection is null or closed already. Not closing it again");
        } else {
            try {
                connection.close();
            }
            catch (Exception e) {
                LOGGER.warn("Fail to close connection: {}", (Object)e.getMessage(), (Object)e);
            }
        }
    }

    private void closeChannel(Channel channel) {
        if (channel == null || !channel.isOpen()) {
            LOGGER.warn("Channel is null or closed already. Not closing it again");
        } else {
            try {
                channel.close();
            }
            catch (Exception e) {
                LOGGER.warn("Fail to close channel: {}", (Object)e.getMessage(), (Object)e);
            }
        }
    }

    private synchronized Channel borrowChannel(ConnectionType connectionType, Connection rmqConnection) throws Exception {
        Channel channel;
        if (!availableChannelPool.containsKey((Object)connectionType)) {
            Channel channel2 = this.getOrCreateChannel(connectionType, rmqConnection);
            LOGGER.info(String.format("Channels are not available in the pool. Created a channel for the connection type [%s]", new Object[]{connectionType}));
            return channel2;
        }
        Set<Channel> channels = availableChannelPool.get((Object)connectionType);
        if (channels != null && channels.isEmpty()) {
            Channel channel3 = this.getOrCreateChannel(connectionType, rmqConnection);
            LOGGER.info(String.format("Channels are not available in the pool. Created a channel for the connection type [%s]", new Object[]{connectionType}));
            return channel3;
        }
        Iterator<Channel> itr = channels.iterator();
        while (itr.hasNext()) {
            channel = itr.next();
            if (channel != null && channel.isOpen()) {
                itr.remove();
                LOGGER.info(String.format("Borrowed the channel object from the channel pool for the connection type [%s]", new Object[]{connectionType}));
                return channel;
            }
            itr.remove();
        }
        channel = this.getOrCreateChannel(connectionType, rmqConnection);
        LOGGER.info(String.format("No proper channels available in the pool. Created a channel for the connection type [%s]", new Object[]{connectionType}));
        return channel;
    }

    public synchronized void returnChannel(ConnectionType connectionType, Channel channel) throws Exception {
        Set<Channel> channels;
        if (channel == null || !channel.isOpen()) {
            channel = null;
        }
        if ((channels = availableChannelPool.get((Object)connectionType)) == null) {
            channels = new HashSet<Channel>();
            availableChannelPool.put(connectionType, channels);
        }
        channels.add(channel);
        LOGGER.info(String.format("Returned the borrowed channel object to the pool for the connection type [%s]", new Object[]{connectionType}));
    }
}

