/*
 * Decompiled with CFR 0.152.
 */
package de.iip_ecosphere.platform.transport.connectors.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Envelope;
import de.iip_ecosphere.platform.transport.connectors.ReceptionCallback;
import de.iip_ecosphere.platform.transport.connectors.TransportParameter;
import de.iip_ecosphere.platform.transport.connectors.impl.AbstractTransportConnector;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.slf4j.LoggerFactory;

public class RabbitMqAmqpTransportConnector
extends AbstractTransportConnector {
    public static final String NAME = "AMQP";
    private Connection connection;
    private Channel channel;
    private boolean tlsEnabled = false;
    private Map<String, String> tags = Collections.synchronizedMap(new HashMap());
    private boolean closing = false;
    private Map<String, String> queueStream = Collections.synchronizedMap(new HashMap());

    public void syncSend(String stream, Object data) throws IOException {
        this.send(stream, data, true);
    }

    public void asyncSend(String stream, Object data) throws IOException {
        this.send(stream, data, false);
    }

    private void checkStream(String stream, boolean send) throws IOException {
        if (!this.isStreamKnown(stream)) {
            this.channel.exchangeDeclare(stream, BuiltinExchangeType.FANOUT, false, true, null);
            this.registerStream(stream);
        }
        if (!send && this.queueStream.get(stream) == null) {
            AMQP.Queue.DeclareOk qRes = this.channel.queueDeclare();
            this.queueStream.put(stream, qRes.getQueue());
            this.channel.queueBind(qRes.getQueue(), stream, "");
        }
    }

    private void send(String stream, Object data, boolean block) throws IOException {
        try {
            this.checkStream(stream, true);
            byte[] payload = this.serialize(stream, data);
            this.channel.basicPublish(stream, "", null, payload);
        }
        catch (IOException e) {
            if (!this.closing) {
                throw e;
            }
        }
        catch (AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
    }

    public void setReceptionCallback(String stream, ReceptionCallback<?> callback) throws IOException {
        this.checkStream(stream, false);
        super.setReceptionCallback(stream, callback);
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            Envelope env = delivery.getEnvelope();
            String st = env.getExchange();
            if (null == st || st.length() == 0) {
                st = env.getRoutingKey();
            }
            this.notifyCallback(st, delivery.getBody());
        };
        String tag = UUID.randomUUID().toString();
        this.channel.basicConsume(this.queueStream.get(stream), true, tag, deliverCallback, consumerTag -> {});
        this.tags.put(stream, tag);
    }

    public void unsubscribe(String stream, boolean delete) throws IOException {
        super.unsubscribe(stream, delete);
        String tag = this.tags.remove(stream);
        if (null != tag) {
            this.channel.basicCancel(tag);
        }
        if (delete) {
            this.channel.queueDeleteNoWait(stream, true, false);
        }
    }

    public String composeStreamName(String parent, String name) {
        String streamName;
        String string = streamName = parent != null && parent.length() > 0 ? parent + "-" + name : name;
        if (streamName.length() > 256) {
            streamName = streamName.substring(0, 254);
            throw new IllegalArgumentException("stream name length > 256");
        }
        return streamName;
    }

    public void connect(TransportParameter params) throws IOException {
        super.connect(params);
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(params.getHost());
        factory.setPort(params.getPort());
        factory.setAutomaticRecoveryEnabled(true);
        RabbitMqAmqpTransportConnector.applyAuthenticationKey((String)params.getAuthenticationKey(), (user, pwd, enc) -> {
            factory.setUsername(user);
            factory.setPassword(pwd);
            return true;
        });
        if (this.useTls(params)) {
            try {
                factory.useSslProtocol(this.createTlsContext(params));
                this.tlsEnabled = true;
            }
            catch (IOException e) {
                LoggerFactory.getLogger(((Object)((Object)this)).getClass()).error("AMQP: Loading keystore " + e.getMessage() + ". Trying with no TLS.");
            }
        }
        this.configureFactory(factory);
        try {
            LoggerFactory.getLogger(((Object)((Object)this)).getClass()).info("AMQP: Connecting to " + params.getHost() + " " + params.getPort());
            this.connection = factory.newConnection();
            this.channel = this.connection.createChannel();
        }
        catch (TimeoutException e) {
            throw new IOException(e.getMessage(), e);
        }
    }

    protected void configureFactory(ConnectionFactory factory) {
    }

    public void disconnect() throws IOException {
        if (!this.closing) {
            this.closing = true;
            super.disconnect();
            try {
                this.channel.close();
            }
            catch (TimeoutException timeoutException) {
            }
            catch (AlreadyClosedException alreadyClosedException) {
                // empty catch block
            }
            try {
                this.connection.close();
            }
            catch (AlreadyClosedException alreadyClosedException) {
                // empty catch block
            }
        }
    }

    public String getName() {
        return NAME;
    }

    public String supportedEncryption() {
        return "TLS";
    }

    public String enabledEncryption() {
        return this.tlsEnabled ? "TLS" : null;
    }
}

