/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.sender.sqs;

import com.google.common.collect.ImmutableMap;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.sender.AbstractMessageSenderEndpoint;
import de.otto.synapse.message.Message;
import de.otto.synapse.translator.MessageTranslator;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;

public class SqsMessageSender
extends AbstractMessageSenderEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(SqsMessageSender.class);
    public static final String MSG_KEY_ATTR = "synapse_msg_key";
    private final String queueUrl;
    private final SqsAsyncClient sqsAsyncClient;

    public SqsMessageSender(String channelName, String queueUrl, MessageInterceptorRegistry interceptorRegistry, MessageTranslator<String> messageTranslator, SqsAsyncClient sqsAsyncClient) {
        super(channelName, interceptorRegistry, messageTranslator);
        this.queueUrl = queueUrl;
        this.sqsAsyncClient = sqsAsyncClient;
    }

    protected CompletableFuture<Void> doSend(@Nonnull Message<String> message) {
        CompletionStage futureResponse = this.sqsAsyncClient.sendMessage(this.toSendMessageRequest(message)).whenComplete(this.logResponse(message));
        return CompletableFuture.allOf(new CompletableFuture[]{futureResponse});
    }

    protected CompletableFuture<Void> doSendBatch(@Nonnull Stream<Message<String>> messageStream) {
        CompletionStage futureResponse = this.sqsAsyncClient.sendMessageBatch(this.toSendMessageBatchRequest(messageStream)).whenComplete(this.logBatchResponse());
        return CompletableFuture.allOf(new CompletableFuture[]{futureResponse});
    }

    private SendMessageBatchRequest toSendMessageBatchRequest(@Nonnull Stream<Message<String>> messageStream) {
        AtomicInteger id = new AtomicInteger(0);
        return (SendMessageBatchRequest)SendMessageBatchRequest.builder().queueUrl(this.queueUrl).entries((Collection)messageStream.map(message -> (SendMessageBatchRequestEntry)SendMessageBatchRequestEntry.builder().id(String.valueOf(id.getAndIncrement())).messageAttributes(this.of((Message<String>)message)).messageBody((String)message.getPayload()).build()).collect(Collectors.toList())).build();
    }

    private SendMessageRequest toSendMessageRequest(@Nonnull Message<String> message) {
        return (SendMessageRequest)SendMessageRequest.builder().queueUrl(this.queueUrl).messageAttributes(this.of(message)).messageBody((String)message.getPayload()).build();
    }

    private ImmutableMap<String, MessageAttributeValue> of(@Nonnull Message<String> message) {
        ImmutableMap.Builder messageAttributes = ImmutableMap.builder();
        message.getHeader().getAll().entrySet().forEach(entry -> messageAttributes.put(entry.getKey(), MessageAttributeValue.builder().dataType("String").stringValue((String)entry.getValue()).build()));
        messageAttributes.put((Object)MSG_KEY_ATTR, MessageAttributeValue.builder().dataType("String").stringValue(message.getKey().partitionKey()).build());
        return messageAttributes.build();
    }

    private BiConsumer<SendMessageResponse, Throwable> logResponse(@Nonnull Message<String> message) {
        return (result, exception) -> {
            if (exception != null) {
                LOG.error(String.format("Failed to send message %s", message), exception);
            }
            if (result != null) {
                LOG.debug("Successfully sent message {}", result);
            }
        };
    }

    private BiConsumer<SendMessageBatchResponse, Throwable> logBatchResponse() {
        return (result, exception) -> {
            if (exception != null) {
                LOG.error("Failed to send batch of messages: " + exception.getMessage(), exception);
            }
            if (result != null) {
                if (!result.successful().isEmpty()) {
                    LOG.debug("Successfully sent {} messages in a batch", (Object)result.successful().size());
                }
                if (!result.failed().isEmpty()) {
                    LOG.error("Failed to sent {} messages in a batch: {}", (Object)result.failed().size(), (Object)result.failed());
                }
            }
        };
    }
}

