/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.aws;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.otto.synapse.endpoint.receiver.AbstractMessageReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageQueueReceiverEndpoint;
import de.otto.synapse.message.Message;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import software.amazon.awssdk.services.sqs.SQSAsyncClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

public class SqsMessageQueueReceiverEndpoint
extends AbstractMessageReceiverEndpoint
implements MessageQueueReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(SqsMessageQueueReceiverEndpoint.class);
    private static final int VISIBILITY_TIMEOUT = 5;
    private static final int WAIT_TIME_SECONDS = 10;
    @Nonnull
    private final SQSAsyncClient sqsAsyncClient;
    private final String queueUrl;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);

    public SqsMessageQueueReceiverEndpoint(@Nonnull String channelName, @Nonnull SQSAsyncClient sqsAsyncClient, @Nonnull ObjectMapper objectMapper, @Nullable ApplicationEventPublisher eventPublisher) {
        super(channelName, objectMapper, eventPublisher);
        this.sqsAsyncClient = sqsAsyncClient;
        try {
            this.queueUrl = ((GetQueueUrlResponse)sqsAsyncClient.getQueueUrl((GetQueueUrlRequest)GetQueueUrlRequest.builder().queueName(channelName).build()).get()).queueUrl();
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public CompletableFuture<Void> consume() {
        this.getMessageDispatcher().getAll().forEach(messageConsumer -> {
            if (!messageConsumer.keyPattern().pattern().equals(".*")) {
                throw new IllegalStateException("Unable to select messages using key pattern");
            }
        });
        return CompletableFuture.runAsync(() -> {
            do {
                LOG.debug("Sending receiveMessage request...");
                this.receiveAndProcess();
            } while (!this.stopSignal.get());
        });
    }

    private void receiveAndProcess() {
        try {
            ((CompletableFuture)this.sqsAsyncClient.receiveMessage((ReceiveMessageRequest)ReceiveMessageRequest.builder().queueUrl(this.queueUrl).visibilityTimeout(Integer.valueOf(5)).waitTimeSeconds(Integer.valueOf(10)).build()).thenAccept(this::processResponse)).get();
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private void processResponse(ReceiveMessageResponse response) {
        LOG.debug("Received {} messages from SQS.", (Object)response.messages().size());
        if (response.messages() != null) {
            response.messages().forEach(this::processMessage);
        }
    }

    private void processMessage(software.amazon.awssdk.services.sqs.model.Message sqsMessage) {
        LOG.debug("Processing message from channel={}: messageId={} receiptHandle={}, attributes={}, messageAttributes={}", new Object[]{this.getChannelName(), sqsMessage.messageId(), sqsMessage.receiptHandle(), sqsMessage.attributesAsStrings()});
        Message message = Message.message((String)"", (Object)sqsMessage.body());
        Message interceptedMessage = this.intercept(message);
        if (interceptedMessage != null) {
            LOG.debug("Dispatching message {} ", (Object)interceptedMessage);
            this.getMessageDispatcher().accept(interceptedMessage);
        }
        this.deleteMessage(sqsMessage);
    }

    private void deleteMessage(software.amazon.awssdk.services.sqs.model.Message sqsMessage) {
        try {
            LOG.debug("Deleting message with receiptHandle={}", (Object)sqsMessage.receiptHandle());
            this.sqsAsyncClient.deleteMessage((DeleteMessageRequest)DeleteMessageRequest.builder().queueUrl(this.queueUrl).receiptHandle(sqsMessage.receiptHandle()).build());
        }
        catch (RuntimeException e) {
            LOG.error("Error deleting message: " + e.getMessage(), (Throwable)e);
            throw e;
        }
    }

    public void stop() {
        LOG.info("Channel {} received stop signal.", (Object)this.getChannelName());
        this.stopSignal.set(true);
    }
}

