/*
 * Decompiled with CFR 0.152.
 */
package org.duracloud.common.queue.aws;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResultEntry;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueAttributeName;
import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.duracloud.common.error.DuraCloudRuntimeException;
import org.duracloud.common.queue.TaskException;
import org.duracloud.common.queue.TaskNotFoundException;
import org.duracloud.common.queue.TaskQueue;
import org.duracloud.common.queue.TimeoutException;
import org.duracloud.common.queue.task.Task;
import org.duracloud.common.retry.Retriable;
import org.duracloud.common.retry.Retrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SQSTaskQueue
implements TaskQueue {
    private static Logger log = LoggerFactory.getLogger(SQSTaskQueue.class);
    private AmazonSQSClient sqsClient;
    private String queueName;
    private String queueUrl;
    private Integer visibilityTimeout;

    public SQSTaskQueue(String queueName) {
        this(new AmazonSQSClient(), queueName);
    }

    public SQSTaskQueue(AmazonSQSClient sqsClient, String queueName) {
        this.sqsClient = sqsClient;
        this.queueName = queueName;
        this.queueUrl = this.getQueueUrl();
        this.visibilityTimeout = this.getVisibilityTimeout();
    }

    @Override
    public String getName() {
        return this.queueName;
    }

    protected Task marshallTask(Message msg) {
        Properties props = new Properties();
        Task task = null;
        try {
            props.load(new StringReader(msg.getBody()));
            if (props.containsKey("type")) {
                task = new Task();
                for (String key : props.stringPropertyNames()) {
                    if (key.equals("type")) {
                        task.setType(Task.Type.valueOf(props.getProperty(key)));
                        continue;
                    }
                    task.addProperty(key, props.getProperty(key));
                }
                task.addProperty(MsgProp.MSG_ID.name(), msg.getMessageId());
                task.addProperty(MsgProp.RECEIPT_HANDLE.name(), msg.getReceiptHandle());
            } else {
                log.error("SQS message from queue: " + this.queueName + ", queueUrl: " + this.queueUrl + " does not contain a 'task type'");
            }
        }
        catch (IOException ioe) {
            log.error("Error creating Task", ioe);
        }
        return task;
    }

    protected String unmarshallTask(Task task) {
        Properties props = new Properties();
        props.setProperty("type", task.getType().name());
        for (String key : task.getProperties().keySet()) {
            String value = task.getProperty(key);
            if (null == value) continue;
            props.setProperty(key, value);
        }
        StringWriter sw = new StringWriter();
        String msgBody = null;
        try {
            props.store(sw, null);
            msgBody = sw.toString();
        }
        catch (IOException ioe) {
            log.error("Error unmarshalling Task, queue: " + this.queueName + ", msgBody: " + msgBody, ioe);
        }
        return msgBody;
    }

    @Override
    public void put(Task task) {
        try {
            final String msgBody = this.unmarshallTask(task);
            new Retrier(4, 10000, 2).execute(new Retriable(){

                @Override
                public Object retry() throws Exception {
                    SQSTaskQueue.this.sqsClient.sendMessage(new SendMessageRequest(SQSTaskQueue.this.queueUrl, msgBody));
                    return null;
                }
            });
            log.info("SQS message successfully placed {} on queue - queue: {}", task, (Object)this.queueName);
        }
        catch (Exception ex) {
            log.error("failed to place {} on {} due to {}", new Object[]{task, this.queueName, ex.getMessage()});
            throw new DuraCloudRuntimeException(ex);
        }
    }

    @Override
    public void put(Task ... tasks) {
        HashSet<Task> taskSet = new HashSet<Task>();
        taskSet.addAll(Arrays.asList(tasks));
        this.put(taskSet);
    }

    @Override
    public void put(Set<Task> tasks) {
        String msgBody = null;
        SendMessageBatchRequestEntry msgEntry = null;
        HashSet<SendMessageBatchRequestEntry> msgEntries = new HashSet<SendMessageBatchRequestEntry>();
        for (Task task : tasks) {
            msgBody = this.unmarshallTask(task);
            msgEntry = new SendMessageBatchRequestEntry().withMessageBody(msgBody).withId(msgEntries.size() + "");
            msgEntries.add(msgEntry);
            if (msgEntries.size() != 10) continue;
            this.sendBatchMessages(msgEntries);
            msgEntries.clear();
        }
        if (!msgEntries.isEmpty()) {
            this.sendBatchMessages(msgEntries);
        }
    }

    private void sendBatchMessages(Set<SendMessageBatchRequestEntry> msgEntries) {
        try {
            final SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest().withQueueUrl(this.queueUrl).withEntries(msgEntries);
            new Retrier(4, 5000, 2).execute(new Retriable(){

                @Override
                public Object retry() throws Exception {
                    SQSTaskQueue.this.sqsClient.sendMessageBatch(sendMessageBatchRequest);
                    return null;
                }
            });
            log.info("{} SQS messages successfully placed on queue: {}", msgEntries.size(), (Object)this.queueName);
        }
        catch (Exception ex) {
            log.error("failed to place {} on {} due to {}", new Object[]{msgEntries, this.queueName, ex.getMessage()});
            throw new DuraCloudRuntimeException(ex);
        }
    }

    @Override
    public Set<Task> take(int maxTasks) throws TimeoutException {
        ReceiveMessageResult result = this.sqsClient.receiveMessage(new ReceiveMessageRequest().withQueueUrl(this.queueUrl).withMaxNumberOfMessages(maxTasks).withAttributeNames("SentTimestamp", "ApproximateReceiveCount"));
        if (result.getMessages() != null && result.getMessages().size() > 0) {
            HashSet<Task> tasks = new HashSet<Task>();
            for (Message msg : result.getMessages()) {
                try {
                    Long sentTime = Long.parseLong(msg.getAttributes().get("SentTimestamp"));
                    Long preworkQueueTime = System.currentTimeMillis() - sentTime;
                    log.info("SQS message received - queue: {}, queueUrl: {}, msgId: {}, preworkQueueTime: {}, receiveCount: {}", new Object[]{this.queueName, this.queueUrl, msg.getMessageId(), DurationFormatUtils.formatDuration(preworkQueueTime, "HH:mm:ss,SSS"), msg.getAttributes().get("ApproximateReceiveCount")});
                }
                catch (NumberFormatException nfe) {
                    log.error("Error converting 'SentTimestamp' SQS message attribute to Long, messageId: " + msg.getMessageId(), nfe);
                }
                Task task = this.marshallTask(msg);
                task.setVisibilityTimeout(this.visibilityTimeout);
                tasks.add(task);
            }
            return tasks;
        }
        throw new TimeoutException("No tasks available from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
    }

    @Override
    public Task take() throws TimeoutException {
        return this.take(1).iterator().next();
    }

    @Override
    public void extendVisibilityTimeout(Task task) throws TaskNotFoundException {
        try {
            this.sqsClient.changeMessageVisibility(new ChangeMessageVisibilityRequest().withQueueUrl(this.queueUrl).withReceiptHandle(task.getProperty(MsgProp.RECEIPT_HANDLE.name())).withVisibilityTimeout(task.getVisibilityTimeout()));
            log.info("extended visibility timeout {} seconds for {}", task.getVisibilityTimeout(), (Object)task);
        }
        catch (ReceiptHandleIsInvalidException rhe) {
            log.error("failed to extend visibility timeout on task " + task + ": " + rhe.getMessage(), rhe);
            throw new TaskNotFoundException(rhe);
        }
    }

    @Override
    public void deleteTask(Task task) throws TaskNotFoundException {
        try {
            this.sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(this.queueUrl).withReceiptHandle(task.getProperty(MsgProp.RECEIPT_HANDLE.name())));
            log.info("successfully deleted {}", task);
        }
        catch (ReceiptHandleIsInvalidException rhe) {
            log.error("failed to delete task " + task + ": " + rhe.getMessage(), rhe);
            throw new TaskNotFoundException(rhe);
        }
    }

    @Override
    public void deleteTasks(Set<Task> tasks) throws TaskException {
        if (tasks.size() > 10) {
            throw new IllegalArgumentException("task set must contain 10 or fewer tasks");
        }
        try {
            ArrayList<DeleteMessageBatchRequestEntry> entries = new ArrayList<DeleteMessageBatchRequestEntry>(tasks.size());
            for (Task task : tasks) {
                DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry().withId(task.getProperty(MsgProp.MSG_ID.name())).withReceiptHandle(task.getProperty(MsgProp.RECEIPT_HANDLE.name()));
                entries.add(entry);
            }
            DeleteMessageBatchRequest request = new DeleteMessageBatchRequest().withQueueUrl(this.queueUrl).withEntries(entries);
            DeleteMessageBatchResult result = this.sqsClient.deleteMessageBatch(request);
            List<BatchResultErrorEntry> failed = result.getFailed();
            if (failed != null && failed.size() > 0) {
                for (BatchResultErrorEntry error : failed) {
                    log.info("failed to delete message: " + error);
                }
            }
            for (DeleteMessageBatchResultEntry entry : result.getSuccessful()) {
                log.info("successfully deleted {}", entry);
            }
        }
        catch (AmazonServiceException se) {
            log.error("failed to batch delete tasks " + tasks + ": " + se.getMessage(), se);
            throw new TaskException(se);
        }
    }

    @Override
    public void requeue(Task task) {
        int attempts = task.getAttempts();
        task.incrementAttempts();
        try {
            this.deleteTask(task);
        }
        catch (TaskNotFoundException e) {
            log.error("unable to delete " + task + " ignoring - requeuing anyway");
        }
        this.put(task);
        log.warn("requeued {} after {} failed attempts.", task, (Object)attempts);
    }

    @Override
    public Integer size() {
        GetQueueAttributesResult result = this.queryQueueAttributes(QueueAttributeName.ApproximateNumberOfMessages);
        String sizeStr = result.getAttributes().get(QueueAttributeName.ApproximateNumberOfMessages.name());
        Integer size = Integer.parseInt(sizeStr);
        return size;
    }

    @Override
    public Integer sizeIncludingInvisibleAndDelayed() {
        GetQueueAttributesResult result = this.queryQueueAttributes(QueueAttributeName.ApproximateNumberOfMessages, QueueAttributeName.ApproximateNumberOfMessagesNotVisible, QueueAttributeName.ApproximateNumberOfMessagesDelayed);
        Map<String, String> attributes = result.getAttributes();
        int size = 0;
        for (String attrKey : attributes.keySet()) {
            String value = attributes.get(attrKey);
            log.debug("retrieved attribute: {}={}", (Object)attrKey, (Object)value);
            int intValue = Integer.parseInt(value);
            size += intValue;
        }
        log.debug("calculated size: {}", size);
        return size;
    }

    private Integer getVisibilityTimeout() {
        GetQueueAttributesResult result = this.queryQueueAttributes(QueueAttributeName.VisibilityTimeout);
        String visStr = result.getAttributes().get(QueueAttributeName.VisibilityTimeout.name());
        Integer visibilityTimeout = Integer.parseInt(visStr);
        return visibilityTimeout;
    }

    private String getQueueUrl() {
        return this.sqsClient.getQueueUrl(new GetQueueUrlRequest().withQueueName(this.queueName)).getQueueUrl();
    }

    private GetQueueAttributesResult queryQueueAttributes(QueueAttributeName ... attrNames) {
        return this.sqsClient.getQueueAttributes(new GetQueueAttributesRequest().withQueueUrl(this.queueUrl).withAttributeNames(attrNames));
    }

    public static enum MsgProp {
        MSG_ID,
        RECEIPT_HANDLE;

    }
}

