/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.aws;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import de.otto.synapse.endpoint.receiver.AbstractMessageReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageQueueReceiverEndpoint;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Message;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

public class SqsMessageQueueReceiverEndpoint
extends AbstractMessageReceiverEndpoint
implements MessageQueueReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(SqsMessageQueueReceiverEndpoint.class);
    private static final int VISIBILITY_TIMEOUT = 5;
    private static final int WAIT_TIME_SECONDS = 10;
    public static final MessageAttributeValue EMPTY_STRING_ATTR = (MessageAttributeValue)MessageAttributeValue.builder().dataType("String").stringValue("").build();
    public static final String MSG_KEY_ATTR = "synapse_msg_key";
    @Nonnull
    private final SqsAsyncClient sqsAsyncClient;
    private final String queueUrl;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);

    public SqsMessageQueueReceiverEndpoint(@Nonnull String channelName, @Nonnull SqsAsyncClient sqsAsyncClient, @Nonnull ObjectMapper objectMapper, @Nullable ApplicationEventPublisher eventPublisher) {
        super(channelName, objectMapper, eventPublisher);
        this.sqsAsyncClient = sqsAsyncClient;
        try {
            this.queueUrl = ((GetQueueUrlResponse)sqsAsyncClient.getQueueUrl((GetQueueUrlRequest)GetQueueUrlRequest.builder().queueName(channelName).build()).get()).queueUrl();
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public CompletableFuture<Void> consume() {
        return CompletableFuture.runAsync(() -> {
            do {
                LOG.debug("Sending receiveMessage request...");
                this.receiveAndProcess();
            } while (!this.stopSignal.get());
        });
    }

    private void receiveAndProcess() {
        try {
            ((CompletableFuture)this.sqsAsyncClient.receiveMessage((ReceiveMessageRequest)ReceiveMessageRequest.builder().queueUrl(this.queueUrl).visibilityTimeout(Integer.valueOf(5)).messageAttributeNames(new String[]{".*"}).waitTimeSeconds(Integer.valueOf(10)).build()).thenAccept(this::processResponse)).get();
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private void processResponse(ReceiveMessageResponse response) {
        if (response.messages() != null) {
            LOG.debug("Received {} messages from SQS.", (Object)response.messages().size());
            response.messages().forEach(this::processMessage);
        }
    }

    private void processMessage(software.amazon.awssdk.services.sqs.model.Message sqsMessage) {
        LOG.debug("Processing message from channel={}: messageId={} receiptHandle={}, messageAttributes={}", new Object[]{this.getChannelName(), sqsMessage.messageId(), sqsMessage.receiptHandle(), sqsMessage.messageAttributes()});
        Message message = Message.message((String)this.messageKeyOf(sqsMessage), (Header)Header.responseHeader(null, (Instant)Instant.now(), this.messageAttributesOf(sqsMessage)), (Object)sqsMessage.body());
        Message interceptedMessage = this.intercept(message);
        if (interceptedMessage != null) {
            LOG.debug("Dispatching message {} ", (Object)interceptedMessage);
            this.getMessageDispatcher().accept(interceptedMessage);
        }
        this.deleteMessage(sqsMessage);
    }

    private String messageKeyOf(software.amazon.awssdk.services.sqs.model.Message sqsMessage) {
        return sqsMessage.messageAttributes() != null ? sqsMessage.messageAttributes().getOrDefault(MSG_KEY_ATTR, EMPTY_STRING_ATTR).stringValue() : "";
    }

    private ImmutableMap<String, Object> messageAttributesOf(software.amazon.awssdk.services.sqs.model.Message sqsMessage) {
        if (sqsMessage.messageAttributes() != null) {
            ImmutableMap.Builder attributeBuilder = ImmutableMap.builder();
            sqsMessage.messageAttributes().entrySet().forEach(entry -> {
                switch (((MessageAttributeValue)entry.getValue()).dataType()) {
                    case "String": {
                        attributeBuilder.put(entry.getKey(), (Object)((MessageAttributeValue)entry.getValue()).stringValue());
                        break;
                    }
                    default: {
                        LOG.warn("Ignoring messageAttribute {} with dataType {}: Not yet implemented this type.", entry.getKey(), (Object)((MessageAttributeValue)entry.getValue()).dataType());
                    }
                }
            });
            return attributeBuilder.build();
        }
        return ImmutableMap.of();
    }

    private void deleteMessage(software.amazon.awssdk.services.sqs.model.Message sqsMessage) {
        try {
            LOG.debug("Deleting message with receiptHandle={}", (Object)sqsMessage.receiptHandle());
            this.sqsAsyncClient.deleteMessage((DeleteMessageRequest)DeleteMessageRequest.builder().queueUrl(this.queueUrl).receiptHandle(sqsMessage.receiptHandle()).build());
        }
        catch (RuntimeException e) {
            LOG.error("Error deleting message: " + e.getMessage(), (Throwable)e);
            throw e;
        }
    }

    public void stop() {
        LOG.info("Channel {} received stop signal.", (Object)this.getChannelName());
        this.stopSignal.set(true);
    }
}

