/*
 * Decompiled with CFR 0.152.
 */
package de.iip_ecosphere.platform.transport.connectors.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import de.iip_ecosphere.platform.transport.connectors.ReceptionCallback;
import de.iip_ecosphere.platform.transport.connectors.TransportParameter;
import de.iip_ecosphere.platform.transport.connectors.impl.AbstractTransportConnector;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMqAmqpTransportConnector
extends AbstractTransportConnector {
    public static final String NAME = "AMQP";
    private Connection connection;
    private Channel channel;

    public void syncSend(String stream, Object data) throws IOException {
        this.send(stream, data, true);
    }

    public void asyncSend(String stream, Object data) throws IOException {
        this.send(stream, data, false);
    }

    private void send(String stream, Object data, boolean block) throws IOException {
        if (!this.isStreamKnown(stream)) {
            this.channel.queueDeclare(stream, false, false, true, null);
            this.registerStream(stream);
        }
        byte[] payload = this.serialize(stream, data);
        this.channel.basicPublish("", stream, null, payload);
    }

    public void setReceptionCallback(String stream, ReceptionCallback<?> callback) throws IOException {
        if (!this.isStreamKnown(stream)) {
            this.channel.queueDeclare(stream, false, false, true, null);
        }
        super.setReceptionCallback(stream, callback);
        DeliverCallback deliverCallback = (consumerTag, delivery) -> this.notifyCallback(delivery.getEnvelope().getRoutingKey(), delivery.getBody());
        this.channel.basicConsume(stream, true, deliverCallback, consumerTag -> {});
    }

    public String composeStreamName(String parent, String name) {
        String streamName;
        String string = streamName = parent != null && parent.length() > 0 ? parent + "-" + name : name;
        if (streamName.length() > 256) {
            streamName = streamName.substring(0, 254);
            throw new IllegalArgumentException("stream name length > 256");
        }
        return streamName;
    }

    public void connect(TransportParameter params) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(params.getHost());
        factory.setPort(params.getPort());
        factory.setAutomaticRecoveryEnabled(true);
        if (null != params.getUser() && null != params.getPassword()) {
            factory.setUsername(params.getUser());
            factory.setPassword(params.getPassword());
        }
        this.configureFactory(factory);
        try {
            this.connection = factory.newConnection();
            this.channel = this.connection.createChannel();
        }
        catch (TimeoutException e) {
            throw new IOException(e.getMessage(), e);
        }
    }

    protected void configureFactory(ConnectionFactory factory) {
    }

    public void disconnect() throws IOException {
        try {
            this.channel.close();
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        this.connection.close();
    }

    public String getName() {
        return NAME;
    }
}

