/*
 * Decompiled with CFR 0.152.
 */
package no.nav.common.kafka.consumer.feilhandtering;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import no.nav.common.kafka.consumer.ConsumeStatus;
import no.nav.common.kafka.consumer.feilhandtering.KafkaConsumerRecordProcessorConfig;
import no.nav.common.kafka.consumer.feilhandtering.KafkaConsumerRepository;
import no.nav.common.kafka.consumer.feilhandtering.StoredConsumerRecord;
import no.nav.common.kafka.consumer.feilhandtering.StoredRecordConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerRecordProcessor {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private final LockProvider lockProvider;
    private final KafkaConsumerRepository kafkaConsumerRepository;
    private final Map<String, StoredRecordConsumer> recordConsumers;
    private final KafkaConsumerRecordProcessorConfig config;
    private volatile boolean isRunning;
    private volatile boolean isClosed;

    public KafkaConsumerRecordProcessor(LockProvider lockProvider, KafkaConsumerRepository kafkaRepository, Map<String, StoredRecordConsumer> recordConsumers, KafkaConsumerRecordProcessorConfig config) {
        this.lockProvider = lockProvider;
        this.kafkaConsumerRepository = kafkaRepository;
        this.recordConsumers = recordConsumers;
        this.config = config;
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
    }

    public void start() {
        if (this.isClosed) {
            throw new IllegalStateException("Cannot start closed consumer record forwarder");
        }
        if (!this.isRunning) {
            this.executorService.submit(this::recordHandlerLoop);
        }
    }

    public void close() {
        this.isRunning = false;
        this.isClosed = true;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void recordHandlerLoop() {
        this.isRunning = true;
        ArrayList<String> topics = new ArrayList<String>(this.recordConsumers.keySet());
        try {
            while (this.isRunning) {
                try {
                    List<TopicPartition> uniquePartitions = this.kafkaConsumerRepository.getTopicPartitions(topics);
                    if (uniquePartitions.isEmpty()) {
                        Thread.sleep(this.config.pollTimeout.toMillis());
                        continue;
                    }
                    this.consumeFromTopicPartitions(uniquePartitions);
                }
                catch (Exception e) {
                    this.log.error("Failed to consume stored kafka records", (Throwable)e);
                    Thread.sleep(this.config.errorTimeout.toMillis());
                }
            }
            return;
        }
        catch (Exception e) {
            this.log.error("Unexpected exception caught in stored consumer record handler loop", (Throwable)e);
            return;
        }
        finally {
            this.log.info("Closing kafka consumer record processor...");
        }
    }

    private void consumeFromTopicPartitions(List<TopicPartition> uniquePartitions) {
        uniquePartitions.forEach(topicPartition -> {
            if (!this.isRunning) {
                return;
            }
            Optional<Object> lock = Optional.empty();
            try {
                lock = this.acquireLock((TopicPartition)topicPartition);
                if (lock.isEmpty()) {
                    return;
                }
                List<StoredConsumerRecord> records = this.kafkaConsumerRepository.getRecords(topicPartition.topic(), topicPartition.partition(), this.config.recordBatchSize);
                StoredRecordConsumer recordConsumer = this.recordConsumers.get(topicPartition.topic());
                ArrayList<Long> recordsToDelete = new ArrayList<Long>();
                HashMap failedKeys = new HashMap();
                records.forEach(r -> {
                    ConsumeStatus status;
                    Set keySet = (Set)failedKeys.get(topicPartition);
                    if (keySet != null && keySet.contains(Bytes.wrap((byte[])r.getKey()))) {
                        return;
                    }
                    try {
                        status = recordConsumer.consume((StoredConsumerRecord)r);
                    }
                    catch (Exception e) {
                        status = ConsumeStatus.FAILED;
                    }
                    if (status == ConsumeStatus.OK) {
                        recordsToDelete.add(r.getId());
                    } else {
                        this.log.error("Failed to process consumer record topic={} partition={} offset={} dbId={}", new Object[]{r.getTopic(), r.getPartition(), r.getOffset(), r.getId()});
                        this.kafkaConsumerRepository.incrementRetries(r.getId());
                        failedKeys.computeIfAbsent(topicPartition, _ignored -> new HashSet()).add(Bytes.wrap((byte[])r.getKey()));
                    }
                });
                if (!recordsToDelete.isEmpty()) {
                    this.kafkaConsumerRepository.deleteRecords(recordsToDelete);
                    this.log.info("Consumed records deleted " + Arrays.toString(recordsToDelete.toArray()));
                }
            }
            catch (Exception e) {
                this.log.error("Unexpected exception caught while processing consumer records", (Throwable)e);
            }
            finally {
                lock.ifPresent(SimpleLock::unlock);
            }
        });
    }

    private Optional<SimpleLock> acquireLock(TopicPartition topicPartition) {
        String name = "kcrp-" + topicPartition.topic() + "-" + topicPartition.partition();
        LockConfiguration configuration = new LockConfiguration(Instant.now(), name, Duration.ofMinutes(5L), Duration.ofSeconds(5L));
        return this.lockProvider.lock(configuration);
    }
}

