/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.kafka.common.consumer.containers.handlers.impl;

import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import ru.tinkoff.kora.application.graph.ValueOf;
import ru.tinkoff.kora.kafka.common.consumer.containers.handlers.BaseKafkaRecordsHandler;
import ru.tinkoff.kora.kafka.common.consumer.containers.handlers.KafkaRecordHandler;
import ru.tinkoff.kora.kafka.common.consumer.telemetry.KafkaConsumerTelemetry;
import ru.tinkoff.kora.kafka.common.exceptions.KafkaSkipRecordException;
import ru.tinkoff.kora.kafka.common.exceptions.SkippableRecordException;

public class RecordHandler<K, V>
implements BaseKafkaRecordsHandler<K, V> {
    private final KafkaConsumerTelemetry<K, V> telemetry;
    private final ValueOf<KafkaRecordHandler<K, V>> handler;
    private final boolean shouldCommit;

    public RecordHandler(KafkaConsumerTelemetry<K, V> telemetry, boolean shouldCommit, ValueOf<KafkaRecordHandler<K, V>> handler) {
        this.telemetry = telemetry;
        this.handler = handler;
        this.shouldCommit = shouldCommit;
    }

    @Override
    public void handle(ConsumerRecords<K, V> records, Consumer<K, V> consumer, boolean commitAllowed) {
        if (records.isEmpty()) {
            return;
        }
        KafkaConsumerTelemetry.KafkaConsumerRecordsTelemetryContext<K, V> ctx = this.telemetry.get(records);
        try {
            KafkaRecordHandler handler = (KafkaRecordHandler)this.handler.get();
            for (ConsumerRecord record : records) {
                KafkaConsumerTelemetry.KafkaConsumerRecordTelemetryContext<K, V> recordCtx = ctx.get(record);
                try {
                    Throwable skippedException;
                    try {
                        handler.handle(consumer, recordCtx, record);
                        skippedException = null;
                    }
                    catch (Throwable e) {
                        if (e instanceof KafkaSkipRecordException) {
                            skippedException = e.getCause();
                        }
                        if (e instanceof SkippableRecordException) {
                            skippedException = e;
                        }
                        throw e;
                    }
                    if (this.shouldCommit && commitAllowed) {
                        Map<TopicPartition, OffsetAndMetadata> topicAndOffsetAndMeta = Map.of(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L, record.leaderEpoch(), ""));
                        try {
                            consumer.commitSync(topicAndOffsetAndMeta);
                            recordCtx.close(skippedException);
                            continue;
                        }
                        catch (WakeupException e) {
                            consumer.commitSync(topicAndOffsetAndMeta);
                            recordCtx.close(skippedException);
                            throw e;
                        }
                    }
                    recordCtx.close(skippedException);
                }
                catch (WakeupException e) {
                    throw e;
                }
                catch (Exception e) {
                    recordCtx.close(e);
                    throw e;
                }
            }
            ctx.close(null);
        }
        catch (Exception e) {
            ctx.close(e);
            throw e;
        }
    }
}

