/*
 * Decompiled with CFR 0.152.
 */
package no.nav.common.kafka.consumer.feilhandtering;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import no.nav.common.kafka.consumer.ConsumeStatus;
import no.nav.common.kafka.consumer.TopicConsumer;
import no.nav.common.kafka.consumer.feilhandtering.KafkaConsumerRecordProcessorConfig;
import no.nav.common.kafka.consumer.feilhandtering.KafkaConsumerRepository;
import no.nav.common.kafka.consumer.feilhandtering.StoredConsumerRecord;
import no.nav.common.kafka.consumer.util.ConsumerUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerRecordProcessor {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private final LockProvider lockProvider;
    private final KafkaConsumerRepository kafkaConsumerRepository;
    private final Map<String, TopicConsumer<byte[], byte[]>> topicConsumers;
    private final KafkaConsumerRecordProcessorConfig config;
    private volatile boolean isRunning;
    private volatile boolean isClosed;

    public KafkaConsumerRecordProcessor(LockProvider lockProvider, KafkaConsumerRepository kafkaRepository, Map<String, TopicConsumer<byte[], byte[]>> topicConsumers, KafkaConsumerRecordProcessorConfig config) {
        this.lockProvider = lockProvider;
        this.kafkaConsumerRepository = kafkaRepository;
        this.config = config;
        this.topicConsumers = topicConsumers;
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
    }

    public void start() {
        if (this.isClosed) {
            throw new IllegalStateException("Cannot start closed consumer record forwarder");
        }
        if (!this.isRunning) {
            this.executorService.submit(this::recordHandlerLoop);
        }
    }

    public void close() {
        this.isRunning = false;
        this.isClosed = true;
    }

    private void recordHandlerLoop() {
        this.isRunning = true;
        ArrayList<String> topics = new ArrayList<String>(this.topicConsumers.keySet());
        try {
            while (this.isRunning) {
                try {
                    List<TopicPartition> uniquePartitions = this.kafkaConsumerRepository.getTopicPartitions(topics);
                    this.consumeFromTopicPartitions(uniquePartitions);
                    Thread.sleep(this.config.pollTimeout.toMillis());
                }
                catch (Exception e) {
                    this.log.error("Failed to consume stored kafka records", (Throwable)e);
                    Thread.sleep(this.config.errorTimeout.toMillis());
                }
            }
        }
        catch (Exception e) {
            this.log.error("Unexpected exception caught in stored consumer record handler loop", (Throwable)e);
        }
        finally {
            this.log.info("Closing kafka consumer record processor...");
        }
    }

    private void consumeFromTopicPartitions(List<TopicPartition> uniquePartitions) {
        uniquePartitions.forEach(topicPartition -> {
            if (!this.isRunning) {
                return;
            }
            Optional<Object> lock = Optional.empty();
            try {
                lock = this.acquireLock((TopicPartition)topicPartition);
                if (lock.isEmpty()) {
                    return;
                }
                List<StoredConsumerRecord> records = this.kafkaConsumerRepository.getRecords(topicPartition.topic(), topicPartition.partition(), this.config.recordBatchSize);
                TopicConsumer<byte[], byte[]> recordConsumer = this.topicConsumers.get(topicPartition.topic());
                ArrayList<Long> recordsToDelete = new ArrayList<Long>();
                HashMap failedOrBackedOffKeys = new HashMap();
                records.forEach(r -> {
                    ConsumeStatus status;
                    Set keySet = (Set)failedOrBackedOffKeys.get(topicPartition);
                    Bytes keyBytes = Bytes.wrap((byte[])r.getKey());
                    if (keySet != null && keyBytes != null && keySet.contains(keyBytes)) {
                        return;
                    }
                    Duration backoffDuration = this.config.backoffStrategy.getBackoffDuration((StoredConsumerRecord)r);
                    LocalDateTime now = LocalDateTime.now();
                    if (r.getLastRetry() != null && r.getLastRetry().toLocalDateTime().plus(backoffDuration).isAfter(now)) {
                        if (keyBytes != null) {
                            failedOrBackedOffKeys.computeIfAbsent(topicPartition, _ignored -> new HashSet()).add(keyBytes);
                        }
                        return;
                    }
                    Exception exception = null;
                    try {
                        status = recordConsumer.consume(ConsumerUtils.mapFromStoredRecord(r));
                    }
                    catch (Exception e) {
                        exception = e;
                        status = ConsumeStatus.FAILED;
                    }
                    if (status == ConsumeStatus.OK) {
                        this.log.info("Successfully processed stored record topic={} partition={} offset={} dbId={}", new Object[]{r.getTopic(), r.getPartition(), r.getOffset(), r.getId()});
                        recordsToDelete.add(r.getId());
                    } else {
                        String message = String.format("Failed to process stored consumer record topic=%s partition=%d offset=%d dbId=%d", r.getTopic(), r.getPartition(), r.getOffset(), r.getId());
                        this.log.error(message, (Throwable)exception);
                        this.kafkaConsumerRepository.incrementRetries(r.getId());
                        if (keyBytes != null) {
                            failedOrBackedOffKeys.computeIfAbsent(topicPartition, _ignored -> new HashSet()).add(keyBytes);
                        }
                    }
                });
                if (!recordsToDelete.isEmpty()) {
                    this.kafkaConsumerRepository.deleteRecords(recordsToDelete);
                    this.log.info("Stored consumer records deleted " + Arrays.toString(recordsToDelete.toArray()));
                }
            }
            catch (Exception e) {
                this.log.error("Unexpected exception caught while processing stored consumer records", (Throwable)e);
            }
            finally {
                lock.ifPresent(SimpleLock::unlock);
            }
        });
    }

    private Optional<SimpleLock> acquireLock(TopicPartition topicPartition) {
        String name = "kcrp-" + topicPartition.topic() + "-" + topicPartition.partition();
        LockConfiguration configuration = new LockConfiguration(Instant.now(), name, Duration.ofMinutes(5L), Duration.ofSeconds(5L));
        return this.lockProvider.lock(configuration);
    }
}

