/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.kafka.common.consumer.containers.handlers.impl;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.WakeupException;
import ru.tinkoff.kora.application.graph.ValueOf;
import ru.tinkoff.kora.kafka.common.consumer.containers.handlers.BaseKafkaRecordsHandler;
import ru.tinkoff.kora.kafka.common.consumer.containers.handlers.KafkaRecordsHandler;
import ru.tinkoff.kora.kafka.common.consumer.telemetry.KafkaConsumerTelemetry;

public class RecordsHandler<K, V>
implements BaseKafkaRecordsHandler<K, V> {
    private final KafkaConsumerTelemetry<K, V> telemetry;
    private final ValueOf<KafkaRecordsHandler<K, V>> handler;
    private final boolean shouldCommit;

    public RecordsHandler(KafkaConsumerTelemetry<K, V> telemetry, boolean shouldCommit, ValueOf<KafkaRecordsHandler<K, V>> handler) {
        this.telemetry = telemetry;
        this.handler = handler;
        this.shouldCommit = shouldCommit;
    }

    @Override
    public void handle(ConsumerRecords<K, V> records, Consumer<K, V> consumer, boolean commitAllowed) {
        if (records.isEmpty()) {
            return;
        }
        KafkaConsumerTelemetry.KafkaConsumerRecordsTelemetryContext<K, V> ctx = this.telemetry.get(records);
        try {
            KafkaRecordsHandler handler = (KafkaRecordsHandler)this.handler.get();
            handler.handle(consumer, ctx, records);
            if (this.shouldCommit && commitAllowed) {
                try {
                    consumer.commitSync();
                }
                catch (WakeupException e) {
                    consumer.commitSync();
                }
            }
            ctx.close(null);
        }
        catch (Exception e) {
            ctx.close(e);
            throw e;
        }
    }
}

