/*
 * Decompiled with CFR 0.152.
 */
package org.duracloud.common.queue.aws;

import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueAttributeName;
import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.duracloud.common.queue.TaskNotFoundException;
import org.duracloud.common.queue.TaskQueue;
import org.duracloud.common.queue.TimeoutException;
import org.duracloud.common.queue.task.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SQSTaskQueue
implements TaskQueue {
    private static Logger log = LoggerFactory.getLogger(SQSTaskQueue.class);
    private AmazonSQSClient sqsClient;
    private String queueName;
    private String queueUrl;
    private Integer visibilityTimeout;

    public SQSTaskQueue(String queueName) {
        this(new AmazonSQSClient(), queueName);
    }

    public SQSTaskQueue(AmazonSQSClient sqsClient, String queueName) {
        this.sqsClient = sqsClient;
        this.queueName = queueName;
        this.queueUrl = this.getQueueUrl();
        this.visibilityTimeout = this.getVisibilityTimeout();
    }

    @Override
    public String getName() {
        return this.queueName;
    }

    protected Task marshallTask(Message msg) {
        Properties props = new Properties();
        Task task = null;
        try {
            props.load(new StringReader(msg.getBody()));
            if (props.containsKey("type")) {
                task = new Task();
                for (String key : props.stringPropertyNames()) {
                    if (key.equals("type")) {
                        task.setType(Task.Type.valueOf(props.getProperty(key)));
                        continue;
                    }
                    task.addProperty(key, props.getProperty(key));
                }
                task.addProperty(MsgProp.MSG_ID.name(), msg.getMessageId());
                task.addProperty(MsgProp.RECEIPT_HANDLE.name(), msg.getReceiptHandle());
            } else {
                log.error("SQS message from queue: " + this.queueName + ", queueUrl: " + this.queueUrl + " does not contain a 'task type'");
            }
        }
        catch (IOException ioe) {
            log.error("Error creating Task", (Throwable)ioe);
        }
        return task;
    }

    protected String unmarshallTask(Task task) {
        Properties props = new Properties();
        props.setProperty("type", task.getType().name());
        for (String key : task.getProperties().keySet()) {
            String value = task.getProperty(key);
            if (null == value) continue;
            props.setProperty(key, value);
        }
        StringWriter sw = new StringWriter();
        String msgBody = null;
        try {
            props.store(sw, null);
            msgBody = sw.toString();
        }
        catch (IOException ioe) {
            log.error("Error unmarshalling Task, queue: " + this.queueName + ", msgBody: " + msgBody, (Throwable)ioe);
        }
        return msgBody;
    }

    @Override
    public void put(Task task) {
        String msgBody = this.unmarshallTask(task);
        this.sqsClient.sendMessage(new SendMessageRequest(this.queueUrl, msgBody));
        log.info("SQS message successfully placed {} on queue - queue: {}", (Object)task, (Object)this.queueName);
    }

    @Override
    public void put(Task ... tasks) {
        HashSet<Task> taskSet = new HashSet<Task>();
        taskSet.addAll(Arrays.asList(tasks));
        this.put(taskSet);
    }

    @Override
    public void put(Set<Task> tasks) {
        String msgBody = null;
        SendMessageBatchRequestEntry msgEntry = null;
        HashSet<SendMessageBatchRequestEntry> msgEntries = new HashSet<SendMessageBatchRequestEntry>();
        for (Task task : tasks) {
            msgBody = this.unmarshallTask(task);
            msgEntry = new SendMessageBatchRequestEntry().withMessageBody(msgBody).withId(msgEntries.size() + "");
            msgEntries.add(msgEntry);
            if (msgEntries.size() != 10) continue;
            this.sendBatchMessages(msgEntries);
            msgEntries.clear();
        }
        if (!msgEntries.isEmpty()) {
            this.sendBatchMessages(msgEntries);
        }
    }

    private void sendBatchMessages(Set<SendMessageBatchRequestEntry> msgEntries) {
        SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest().withQueueUrl(this.queueUrl).withEntries(msgEntries);
        this.sqsClient.sendMessageBatch(sendMessageBatchRequest);
        log.debug("{} SQS messages successfully placed on queue: {}", (Object)msgEntries.size(), (Object)this.queueName);
    }

    @Override
    public Task take() throws TimeoutException {
        ReceiveMessageResult result = this.sqsClient.receiveMessage(new ReceiveMessageRequest().withQueueUrl(this.queueUrl).withMaxNumberOfMessages(Integer.valueOf(1)).withAttributeNames(new String[]{"SentTimestamp", "ApproximateReceiveCount"}));
        if (result.getMessages() != null && result.getMessages().size() > 0) {
            Message msg = (Message)result.getMessages().get(0);
            try {
                Long sentTime = Long.parseLong((String)msg.getAttributes().get("SentTimestamp"));
                Long preworkQueueTime = System.currentTimeMillis() - sentTime;
                log.info("SQS message received - queue: {}, queueUrl: {}, msgId: {}, preworkQueueTime: {}, receiveCount: {}", new Object[]{this.queueName, this.queueUrl, msg.getMessageId(), DurationFormatUtils.formatDuration((long)preworkQueueTime, (String)"HH:mm:ss,SSS"), msg.getAttributes().get("ApproximateReceiveCount")});
            }
            catch (NumberFormatException nfe) {
                log.error("Error converting 'SentTimestamp' SQS message attribute to Long, messageId: " + msg.getMessageId(), (Throwable)nfe);
            }
            Task task = this.marshallTask(msg);
            task.setVisibilityTimeout(this.visibilityTimeout);
            return task;
        }
        throw new TimeoutException("No tasks available from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
    }

    @Override
    public void extendVisibilityTimeout(Task task) throws TaskNotFoundException {
        try {
            this.sqsClient.changeMessageVisibility(new ChangeMessageVisibilityRequest().withQueueUrl(this.queueUrl).withReceiptHandle(task.getProperty(MsgProp.RECEIPT_HANDLE.name())).withVisibilityTimeout(task.getVisibilityTimeout()));
            log.info("extended visibility timeout {} seconds for {}", (Object)task.getVisibilityTimeout(), (Object)task);
        }
        catch (ReceiptHandleIsInvalidException rhe) {
            log.error("failed to extend visibility timeout on task " + task + ": " + rhe.getMessage(), (Throwable)rhe);
            throw new TaskNotFoundException(rhe);
        }
    }

    @Override
    public void deleteTask(Task task) throws TaskNotFoundException {
        try {
            this.sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(this.queueUrl).withReceiptHandle(task.getProperty(MsgProp.RECEIPT_HANDLE.name())));
            log.info("successfully deleted {}", (Object)task);
        }
        catch (ReceiptHandleIsInvalidException rhe) {
            log.error("failed to delete task " + task + ": " + rhe.getMessage(), (Throwable)rhe);
            throw new TaskNotFoundException(rhe);
        }
    }

    @Override
    public void requeue(Task task) {
        int attempts = task.getAttempts();
        task.incrementAttempts();
        try {
            this.deleteTask(task);
        }
        catch (TaskNotFoundException e) {
            log.error("unable to delete " + task + " ignoring - requeuing anyway");
        }
        this.put(task);
        log.warn("requeued {} after {} failed attempts.", (Object)task, (Object)attempts);
    }

    @Override
    public Integer size() {
        GetQueueAttributesResult result = this.queryQueueAttributes(QueueAttributeName.ApproximateNumberOfMessages);
        String sizeStr = (String)result.getAttributes().get(QueueAttributeName.ApproximateNumberOfMessages.name());
        Integer size = Integer.parseInt(sizeStr);
        return size;
    }

    @Override
    public Integer sizeIncludingInvisibleAndDelayed() {
        GetQueueAttributesResult result = this.queryQueueAttributes(QueueAttributeName.ApproximateNumberOfMessages, QueueAttributeName.ApproximateNumberOfMessagesNotVisible, QueueAttributeName.ApproximateNumberOfMessagesDelayed);
        Map attributes = result.getAttributes();
        int size = 0;
        for (String attrKey : attributes.keySet()) {
            String value = (String)attributes.get(attrKey);
            log.debug("retrieved attribute: {}={}", (Object)attrKey, (Object)value);
            int intValue = Integer.parseInt(value);
            size += intValue;
        }
        log.debug("calculated size: {}", (Object)size);
        return size;
    }

    private Integer getVisibilityTimeout() {
        GetQueueAttributesResult result = this.queryQueueAttributes(QueueAttributeName.VisibilityTimeout);
        String visStr = (String)result.getAttributes().get(QueueAttributeName.VisibilityTimeout.name());
        Integer visibilityTimeout = Integer.parseInt(visStr);
        return visibilityTimeout;
    }

    private String getQueueUrl() {
        return this.sqsClient.getQueueUrl(new GetQueueUrlRequest().withQueueName(this.queueName)).getQueueUrl();
    }

    private GetQueueAttributesResult queryQueueAttributes(QueueAttributeName ... attrNames) {
        return this.sqsClient.getQueueAttributes(new GetQueueAttributesRequest().withQueueUrl(this.queueUrl).withAttributeNames(attrNames));
    }

    public static enum MsgProp {
        MSG_ID,
        RECEIPT_HANDLE;

    }
}

