/*
 * Decompiled with CFR 0.152.
 */
package org.duracloud.common.queue.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.duracloud.common.error.DuraCloudRuntimeException;
import org.duracloud.common.queue.TaskException;
import org.duracloud.common.queue.TaskNotFoundException;
import org.duracloud.common.queue.TaskQueue;
import org.duracloud.common.queue.TimeoutException;
import org.duracloud.common.queue.task.Task;
import org.duracloud.common.retry.Retriable;
import org.duracloud.common.retry.Retrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RabbitmqTaskQueue
implements TaskQueue {
    private static Logger log = LoggerFactory.getLogger(RabbitmqTaskQueue.class);
    private Channel mqChannel;
    private String queueName;
    private Integer visibilityTimeout = -1;
    private Integer unAcknowlededMesageCount = 0;
    private String queueUrl;
    private String exchangeName;

    public RabbitmqTaskQueue(String host, Integer port, String vhost, String exchange, String username, String password, String queueName) {
        try {
            this.exchangeName = exchange;
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setVirtualHost(vhost);
            factory.setHost(host);
            factory.setPort(port);
            Connection conn = factory.newConnection();
            this.mqChannel = conn.createChannel();
            this.mqChannel.queueBind(queueName, this.exchangeName, queueName);
            this.queueUrl = "(RabbitMQ) " + conn.getAddress();
            this.queueName = queueName;
        }
        catch (Exception ex) {
            log.error("Failed to estabilish connection to RabbitMQ with queue name {} and URL {} because {}", queueName, this.queueUrl, ex.getMessage());
            throw new DuraCloudRuntimeException(ex);
        }
    }

    public RabbitmqTaskQueue(Connection conn, String exchange, String queueName) {
        try {
            this.exchangeName = exchange;
            this.mqChannel = conn.createChannel();
            this.mqChannel.queueBind(queueName, exchange, queueName);
            this.queueUrl = "(RabbitMQ) " + conn.getAddress();
            this.queueName = queueName;
        }
        catch (Exception ex) {
            log.error("Failed to estabilish connection to RabbitMQ with queue name {} and URL {} because {}", queueName, this.queueUrl, ex.getMessage());
            throw new DuraCloudRuntimeException(ex);
        }
    }

    @Override
    public String getName() {
        return this.queueName;
    }

    protected Task marshallTask(byte[] msgBody, long deliveryTag, String routingKey, String exchange) {
        Properties props = new Properties();
        Task task = null;
        String msg = new String(msgBody);
        try {
            props.load(new StringReader(msg));
            if (props.containsKey("type")) {
                task = new Task();
                for (String key : props.stringPropertyNames()) {
                    if (key.equals("type")) {
                        task.setType(Task.Type.valueOf(props.getProperty(key)));
                        continue;
                    }
                    task.addProperty(key, props.getProperty(key));
                }
                task.addProperty(MsgProp.DELIVERY_TAG.name(), String.valueOf(deliveryTag));
                task.addProperty(MsgProp.ROUTING_KEY.name(), routingKey);
                task.addProperty(MsgProp.EXCHANGE.name(), exchange);
            } else {
                log.error("RabbitMQ message from queue: " + this.queueName + " at " + this.queueUrl + ", does not contain a 'task type'");
            }
        }
        catch (IOException ioe) {
            log.error("Error creating Task", ioe);
        }
        return task;
    }

    protected String unmarshallTask(Task task) {
        Properties props = new Properties();
        props.setProperty("type", task.getType().name());
        for (String key : task.getProperties().keySet()) {
            String value = task.getProperty(key);
            if (null == value) continue;
            props.setProperty(key, value);
        }
        StringWriter sw = new StringWriter();
        String msgBody = null;
        try {
            props.store(sw, null);
            msgBody = sw.toString();
        }
        catch (IOException ioe) {
            log.error("Error unmarshalling Task, queue: " + this.queueName + ", msgBody: " + msgBody, ioe);
        }
        return msgBody;
    }

    @Override
    public void put(Task task) {
        try {
            final String queueName = this.queueName;
            String msgBody = this.unmarshallTask(task);
            final byte[] messageBodyBytes = msgBody.getBytes();
            new Retrier(4, 10000, 2).execute(new Retriable(){

                @Override
                public Object retry() throws Exception {
                    RabbitmqTaskQueue.this.mqChannel.basicPublish(RabbitmqTaskQueue.this.exchangeName, queueName, null, messageBodyBytes);
                    return null;
                }
            });
            Integer n = this.unAcknowlededMesageCount;
            Integer n2 = this.unAcknowlededMesageCount = Integer.valueOf(this.unAcknowlededMesageCount + 1);
            log.info("RabbitMQ message successfully placed {} on queue - queue: {}", (Object)task, (Object)queueName);
        }
        catch (Exception ex) {
            log.error("failed to place {} on {} at {} due to {}", task, this.queueName, this.queueUrl, ex.getMessage());
            throw new DuraCloudRuntimeException(ex);
        }
    }

    @Override
    public void put(Task ... tasks) {
        for (Task task : tasks) {
            this.put(task);
        }
    }

    @Override
    public void put(Set<Task> tasks) {
        for (Task task : tasks) {
            this.put(task);
        }
    }

    @Override
    public Set<Task> take(int maxTasks) throws TimeoutException {
        Integer size = this.size();
        if (size > 0) {
            if (size < maxTasks) {
                size = maxTasks;
            }
            HashSet<Task> tasks = new HashSet<Task>();
            try {
                for (int i = 0; i < size; ++i) {
                    Task task = this.take();
                    tasks.add(task);
                }
                return tasks;
            }
            catch (Exception e) {
                for (Task task : tasks) {
                    this.requeue(task);
                }
                throw new TimeoutException("Failed to get at least one message from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
            }
        }
        throw new TimeoutException("No tasks available from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
    }

    @Override
    public Task take() throws TimeoutException {
        Integer size = this.size();
        if (size > 0) {
            try {
                GetResponse response = this.mqChannel.basicGet(this.queueName, false);
                if (response == null) {
                    throw new TimeoutException("No tasks available from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
                }
                AMQP.BasicProperties properties = response.getProps();
                Envelope envelope = response.getEnvelope();
                byte[] body = response.getBody();
                String routingKey = envelope.getRoutingKey();
                String exchange = envelope.getExchange();
                long deliveryTag = envelope.getDeliveryTag();
                Long sentTime = properties.getTimestamp().getTime();
                Long preworkQueueTime = System.currentTimeMillis() - sentTime;
                log.info("RabbitMQ message received - queue: {}, queueUrl: {}, deliveryTag: {}, preworkQueueTime: {}", this.queueName, this.queueUrl, deliveryTag, DurationFormatUtils.formatDuration(preworkQueueTime, "HH:mm:ss,SSS"));
                Task task = this.marshallTask(body, deliveryTag, routingKey, exchange);
                task.setVisibilityTimeout(this.visibilityTimeout);
                return task;
            }
            catch (Exception ex) {
                log.error("failed to take task from " + this.queueName + " due to " + ex.getMessage(), ex);
                throw new TimeoutException("No tasks available from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
            }
        }
        throw new TimeoutException("No tasks available from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
    }

    @Override
    public void extendVisibilityTimeout(Task task) throws TaskNotFoundException {
    }

    @Override
    public void deleteTask(Task task) throws TaskNotFoundException {
        try {
            this.mqChannel.basicAck(Long.parseLong(task.getProperty(MsgProp.DELIVERY_TAG.name())), false);
            log.info("successfully deleted {}", (Object)task);
            Integer n = this.unAcknowlededMesageCount;
            Integer n2 = this.unAcknowlededMesageCount = Integer.valueOf(this.unAcknowlededMesageCount - 1);
        }
        catch (Exception e) {
            log.error("failed to delete task " + task + ": " + e.getMessage(), e);
            throw new TaskNotFoundException(e);
        }
    }

    @Override
    public void deleteTasks(Set<Task> tasks) throws TaskException {
        if (tasks.size() > 10) {
            throw new IllegalArgumentException("task set must contain 10 or fewer tasks");
        }
        try {
            for (Task task : tasks) {
                this.deleteTask(task);
            }
        }
        catch (Exception e) {
            log.error("failed to batch delete tasks " + tasks + ": " + e.getMessage(), e);
            throw new TaskException(e);
        }
    }

    @Override
    public void requeue(Task task) {
        int attempts = task.getAttempts();
        task.incrementAttempts();
        try {
            this.mqChannel.basicReject(Long.parseLong(task.getProperty(MsgProp.DELIVERY_TAG.name())), true);
            Integer n = this.unAcknowlededMesageCount;
            Integer n2 = this.unAcknowlededMesageCount = Integer.valueOf(this.unAcknowlededMesageCount - 1);
        }
        catch (Exception e) {
            log.error("unable to reject message {}, re-put message instead ", (Object)task);
            this.put(task);
        }
        log.warn("requeued {} after {} failed attempts.", (Object)task, (Object)attempts);
    }

    @Override
    public Integer size() {
        try {
            Long sizeLong = this.mqChannel.messageCount(this.queueName);
            return sizeLong.intValue();
        }
        catch (Exception e) {
            return 0;
        }
    }

    @Override
    public Integer sizeIncludingInvisibleAndDelayed() {
        return this.size() + this.unAcknowlededMesageCount;
    }

    private Integer getVisibilityTimeout() {
        return this.visibilityTimeout;
    }

    private String getQueueUrl() {
        return this.queueUrl;
    }

    public static enum MsgProp {
        DELIVERY_TAG,
        ROUTING_KEY,
        EXCHANGE;

    }
}

