/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.sender.aws;

import com.google.common.collect.ImmutableMap;
import de.otto.synapse.endpoint.sender.AbstractMessageSenderEndpoint;
import de.otto.synapse.message.Message;
import de.otto.synapse.translator.MessageTranslator;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SQSAsyncClient;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

public class SqsMessageSender
extends AbstractMessageSenderEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(SqsMessageSender.class);
    public static final String MSG_KEY_ATTR = "synapse_msg_key";
    public static final String MSG_SENDER_ATTR = "synapse_msg_sender";
    private final String queueUrl;
    private final String messageSender;
    private final SQSAsyncClient sqsAsyncClient;

    public SqsMessageSender(String channelName, String queueUrl, MessageTranslator<String> messageTranslator, SQSAsyncClient sqsAsyncClient, String messageSender) {
        super(channelName, messageTranslator);
        this.queueUrl = queueUrl;
        this.sqsAsyncClient = sqsAsyncClient;
        this.messageSender = messageSender;
    }

    protected void doSend(@Nonnull Message<String> message) {
        this.sqsAsyncClient.sendMessage((SendMessageRequest)SendMessageRequest.builder().queueUrl(this.queueUrl).messageAttributes((Map)ImmutableMap.of((Object)MSG_KEY_ATTR, (Object)MessageAttributeValue.builder().dataType("String").stringValue(message.getKey()).build(), (Object)MSG_SENDER_ATTR, (Object)MessageAttributeValue.builder().dataType("String").stringValue(this.messageSender).build())).messageBody((String)message.getPayload()).build()).whenComplete((result, exception) -> {
            if (exception != null) {
                LOG.error(String.format("Failed to send message %s", message), exception);
            }
            if (result != null) {
                LOG.debug("Successfully sent message ", result);
            }
        });
    }

    protected void doSendBatch(@Nonnull Stream<Message<String>> messageStream) {
        AtomicInteger id = new AtomicInteger(0);
        this.sqsAsyncClient.sendMessageBatch((SendMessageBatchRequest)SendMessageBatchRequest.builder().queueUrl(this.queueUrl).entries((Collection)messageStream.map(message -> (SendMessageBatchRequestEntry)SendMessageBatchRequestEntry.builder().id(String.valueOf(id.getAndIncrement())).messageAttributes((Map)ImmutableMap.of((Object)MSG_KEY_ATTR, (Object)MessageAttributeValue.builder().dataType("String").stringValue(message.getKey()).build())).messageBody((String)message.getPayload()).build()).collect(Collectors.toList())).build()).whenComplete((result, exception) -> {
            if (exception != null) {
                LOG.error("Failed to send batch of messages: " + exception.getMessage(), exception);
            }
            if (result != null) {
                if (!result.successful().isEmpty()) {
                    LOG.debug("Successfully sent {} messages in a batch", (Object)result.successful().size());
                }
                if (!result.failed().isEmpty()) {
                    LOG.error("Failed to sent {} messages in a batch: {}", (Object)result.failed().size(), (Object)result.failed());
                }
            }
        });
    }
}

