/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.sender.aws;

import com.google.common.collect.Lists;
import de.otto.synapse.client.aws.RetryPutRecordsKinesisClient;
import de.otto.synapse.message.Message;
import de.otto.synapse.sender.MessageSender;
import de.otto.synapse.translator.MessageTranslator;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;

public class KinesisMessageSender
implements MessageSender {
    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0);
    private static final int PUT_RECORDS_BATCH_SIZE = 500;
    private final String streamName;
    private final MessageTranslator<ByteBuffer> messageTranslator;
    private final RetryPutRecordsKinesisClient retryPutRecordsKinesisClient;

    public KinesisMessageSender(String streamName, MessageTranslator<ByteBuffer> messageTranslator, KinesisClient kinesisClient) {
        this.streamName = streamName;
        this.messageTranslator = messageTranslator;
        this.retryPutRecordsKinesisClient = new RetryPutRecordsKinesisClient(kinesisClient);
    }

    public <T> void send(Message<T> message) {
        Message byteBufferMessage = this.messageTranslator.translate(message);
        PutRecordsRequestEntry putRecordsRequestEntry = this.requestEntryFor(message.getKey(), (ByteBuffer)byteBufferMessage.getPayload());
        PutRecordsRequest putRecordsRequest = (PutRecordsRequest)PutRecordsRequest.builder().streamName(this.streamName).records(new PutRecordsRequestEntry[]{putRecordsRequestEntry}).build();
        this.retryPutRecordsKinesisClient.putRecords(putRecordsRequest);
    }

    public <T> void sendBatch(Stream<Message<T>> messageStream) {
        List entries = messageStream.map(arg_0 -> this.messageTranslator.translate(arg_0)).map(entry -> this.requestEntryFor(entry.getKey(), (ByteBuffer)entry.getPayload())).collect(Collectors.toCollection(ArrayList::new));
        Lists.partition((List)entries, (int)500).forEach(batch -> {
            PutRecordsRequest putRecordsRequest = (PutRecordsRequest)PutRecordsRequest.builder().streamName(this.streamName).records((Collection)batch).build();
            this.retryPutRecordsKinesisClient.putRecords(putRecordsRequest);
        });
    }

    private PutRecordsRequestEntry requestEntryFor(String key, ByteBuffer byteBuffer) {
        return (PutRecordsRequestEntry)PutRecordsRequestEntry.builder().partitionKey(key).data(byteBuffer != null ? byteBuffer : EMPTY_BYTE_BUFFER).build();
    }
}

