/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.sender.aws;

import com.google.common.collect.Lists;
import de.otto.synapse.client.aws.RetryPutRecordsKinesisClient;
import de.otto.synapse.endpoint.sender.AbstractMessageSenderEndpoint;
import de.otto.synapse.message.Message;
import de.otto.synapse.translator.MessageTranslator;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;

public class KinesisMessageSender
extends AbstractMessageSenderEndpoint {
    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0);
    private static final int PUT_RECORDS_BATCH_SIZE = 500;
    private final RetryPutRecordsKinesisClient retryPutRecordsKinesisClient;

    public KinesisMessageSender(String channelName, MessageTranslator<String> messageTranslator, KinesisClient kinesisClient) {
        super(channelName, messageTranslator);
        this.retryPutRecordsKinesisClient = new RetryPutRecordsKinesisClient(kinesisClient);
    }

    protected void doSend(@Nonnull Message<String> message) {
        this.retryPutRecordsKinesisClient.putRecords(() -> this.createPutRecordRequest(message));
    }

    protected void doSendBatch(@Nonnull Stream<Message<String>> messageStream) {
        ArrayList<PutRecordsRequestEntry> entries = this.createPutRecordRequestEntries(messageStream);
        Lists.partition(entries, (int)500).forEach(batch -> this.retryPutRecordsKinesisClient.putRecords(() -> this.createPutRecordRequest((List<PutRecordsRequestEntry>)batch)));
    }

    private PutRecordsRequest createPutRecordRequest(List<PutRecordsRequestEntry> batch) {
        return (PutRecordsRequest)PutRecordsRequest.builder().streamName(this.getChannelName()).records(batch).build();
    }

    private ArrayList<PutRecordsRequestEntry> createPutRecordRequestEntries(@Nonnull Stream<Message<String>> messageStream) {
        return messageStream.map(entry -> this.requestEntryFor(entry.getKey(), (String)entry.getPayload())).collect(Collectors.toCollection(ArrayList::new));
    }

    private PutRecordsRequestEntry requestEntryFor(String key, String payload) {
        ByteBuffer byteBufferPayload = payload != null ? ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)) : EMPTY_BYTE_BUFFER;
        return (PutRecordsRequestEntry)PutRecordsRequestEntry.builder().partitionKey(key).data(byteBufferPayload).build();
    }

    private PutRecordsRequest createPutRecordRequest(@Nonnull Message<String> message) {
        return (PutRecordsRequest)PutRecordsRequest.builder().streamName(this.getChannelName()).records(new PutRecordsRequestEntry[]{this.requestEntryFor(message.getKey(), (String)message.getPayload())}).build();
    }
}

