/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.sqs;

import com.google.common.collect.ImmutableMap;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.receiver.AbstractMessageReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageQueueReceiverEndpoint;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Key;
import de.otto.synapse.message.TextMessage;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

public class SqsMessageQueueReceiverEndpoint
extends AbstractMessageReceiverEndpoint
implements MessageQueueReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(SqsMessageQueueReceiverEndpoint.class);
    private static final int VISIBILITY_TIMEOUT = 5;
    private static final int WAIT_TIME_SECONDS = 2;
    private static final int STOP_TIMEOUT_SECONDS = 3;
    private static final MessageAttributeValue EMPTY_STRING_ATTR = (MessageAttributeValue)MessageAttributeValue.builder().dataType("String").stringValue("").build();
    private static final String MSG_KEY_ATTR = "synapse_msg_key";
    @Nonnull
    private final SqsAsyncClient sqsAsyncClient;
    @Nonnull
    private final ExecutorService executorService;
    private final String queueUrl;
    private final AtomicBoolean stopSignal = new AtomicBoolean(false);
    private final CompletableFuture<Void> stopped = new CompletableFuture();

    public SqsMessageQueueReceiverEndpoint(@Nonnull String channelName, @Nonnull MessageInterceptorRegistry interceptorRegistry, @Nonnull SqsAsyncClient sqsAsyncClient, @Nonnull ExecutorService executorService, @Nullable ApplicationEventPublisher eventPublisher) {
        super(channelName, interceptorRegistry, eventPublisher);
        this.sqsAsyncClient = sqsAsyncClient;
        this.executorService = executorService;
        try {
            this.queueUrl = ((GetQueueUrlResponse)sqsAsyncClient.getQueueUrl((GetQueueUrlRequest)GetQueueUrlRequest.builder().queueName(channelName).overrideConfiguration(((AwsRequestOverrideConfiguration.Builder)AwsRequestOverrideConfiguration.builder().apiCallAttemptTimeout(Duration.ofMillis(2000L))).build()).build()).get()).queueUrl();
        }
        catch (Exception e) {
            this.stopped.complete(null);
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public CompletableFuture<Void> consume() {
        return CompletableFuture.runAsync(() -> {
            try {
                do {
                    this.receiveAndProcess();
                } while (!this.stopSignal.get());
            }
            finally {
                this.stopped.complete(null);
            }
        }, this.executorService);
    }

    private void receiveAndProcess() {
        try {
            LOG.debug("Sending receiveMessage request...");
            ((CompletableFuture)this.sqsAsyncClient.receiveMessage((ReceiveMessageRequest)ReceiveMessageRequest.builder().queueUrl(this.queueUrl).visibilityTimeout(Integer.valueOf(5)).messageAttributeNames(new String[]{".*"}).waitTimeSeconds(Integer.valueOf(2)).build()).thenAccept(this::processResponse)).join();
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private void processResponse(ReceiveMessageResponse response) {
        if (response.messages() != null) {
            LOG.debug("Received {} messages from SQS.", (Object)response.messages().size());
            response.messages().forEach(this::processMessage);
        }
    }

    private void processMessage(Message sqsMessage) {
        LOG.debug("Processing message from channel={}: messageId={} receiptHandle={}, messageAttributes={}", new Object[]{this.getChannelName(), sqsMessage.messageId(), sqsMessage.receiptHandle(), sqsMessage.messageAttributes()});
        try {
            TextMessage message = TextMessage.of((Key)this.messageKeyOf(sqsMessage), (Header)Header.of(null, this.messageAttributesOf(sqsMessage)), (String)sqsMessage.body());
            TextMessage interceptedMessage = this.intercept(message);
            if (interceptedMessage != null) {
                LOG.debug("Dispatching message {} ", (Object)interceptedMessage);
                this.getMessageDispatcher().accept(interceptedMessage);
            }
            this.deleteMessage(sqsMessage);
        }
        catch (Exception e) {
            LOG.error("Failed to process SQS message " + sqsMessage, (Throwable)e);
        }
    }

    private Key messageKeyOf(Message sqsMessage) {
        return sqsMessage.messageAttributes() != null ? Key.of((String)sqsMessage.messageAttributes().getOrDefault(MSG_KEY_ATTR, EMPTY_STRING_ATTR).stringValue()) : Key.NO_KEY;
    }

    private ImmutableMap<String, String> messageAttributesOf(Message sqsMessage) {
        if (sqsMessage.messageAttributes() != null) {
            ImmutableMap.Builder attributeBuilder = ImmutableMap.builder();
            sqsMessage.messageAttributes().entrySet().forEach(entry -> {
                switch (((MessageAttributeValue)entry.getValue()).dataType()) {
                    case "String": {
                        attributeBuilder.put((Object)((String)entry.getKey()), (Object)((MessageAttributeValue)entry.getValue()).stringValue());
                        break;
                    }
                    default: {
                        LOG.warn("Ignoring messageAttribute {} with dataType {}: Not yet implemented this type.", entry.getKey(), (Object)((MessageAttributeValue)entry.getValue()).dataType());
                    }
                }
            });
            return attributeBuilder.build();
        }
        return ImmutableMap.of();
    }

    private void deleteMessage(Message sqsMessage) {
        try {
            LOG.debug("Deleting message with receiptHandle={}", (Object)sqsMessage.receiptHandle());
            this.sqsAsyncClient.deleteMessage((DeleteMessageRequest)DeleteMessageRequest.builder().queueUrl(this.queueUrl).receiptHandle(sqsMessage.receiptHandle()).build()).handle((response, throwable) -> {
                if (response != null) {
                    LOG.debug("Received DeleteMessageResponse={}", response);
                    return response;
                }
                LOG.info("Received exception while deleting message: " + throwable.getMessage());
                throw new RuntimeException((Throwable)throwable);
            });
        }
        catch (RuntimeException e) {
            LOG.error("Error deleting message: " + e.getMessage(), (Throwable)e);
            throw e;
        }
    }

    public void stop() {
        LOG.info("Channel {} received stop signal.", (Object)this.getChannelName());
        this.stopSignal.set(true);
        try {
            ((CompletableFuture)this.stopped.thenAccept(v -> LOG.info("SQS channel {} has been stopped", (Object)this.getChannelName()))).get(3L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}

