/*
 * Decompiled with CFR 0.152.
 */
package no.nav.common.kafka.producer.feilhandtering;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import no.nav.common.job.leader_election.LeaderElectionClient;
import no.nav.common.kafka.producer.KafkaProducerClient;
import no.nav.common.kafka.producer.feilhandtering.KafkaProducerRepository;
import no.nav.common.kafka.producer.feilhandtering.StoredProducerRecord;
import no.nav.common.kafka.producer.util.ProducerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducerRecordProcessor {
    private static final long ERROR_TIMEOUT_MS = 5000L;
    private static final long POLL_TIMEOUT_MS = 3000L;
    private static final long WAITING_FOR_LEADER_TIMEOUT_MS = 10000L;
    private static final int RECORDS_BATCH_SIZE = 100;
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private final KafkaProducerRepository producerRepository;
    private final KafkaProducerClient<byte[], byte[]> producerClient;
    private final LeaderElectionClient leaderElectionClient;
    private volatile boolean isRunning;
    private volatile boolean isClosed;

    public KafkaProducerRecordProcessor(KafkaProducerRepository producerRepository, KafkaProducerClient<byte[], byte[]> producerClient, LeaderElectionClient leaderElectionClient) {
        this.producerRepository = producerRepository;
        this.producerClient = producerClient;
        this.leaderElectionClient = leaderElectionClient;
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
    }

    public void start() {
        if (this.isClosed) {
            throw new IllegalStateException("Cannot start closed producer record processor");
        }
        if (!this.isRunning) {
            this.executorService.submit(this::recordHandlerLoop);
        }
    }

    public void close() {
        this.log.info("Closing kafka producer record processor...");
        this.isRunning = false;
        this.isClosed = true;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void recordHandlerLoop() {
        this.isRunning = true;
        try {
            while (this.isRunning) {
                try {
                    if (!this.leaderElectionClient.isLeader()) {
                        Thread.sleep(10000L);
                        continue;
                    }
                    List<StoredProducerRecord> records = this.producerRepository.getRecords(100);
                    if (!records.isEmpty()) {
                        this.publishStoredRecordsBatch(records);
                    }
                    if (records.size() >= 100) continue;
                    Thread.sleep(3000L);
                }
                catch (Exception e) {
                    this.log.error("Failed to process kafka producer records", (Throwable)e);
                    Thread.sleep(5000L);
                }
            }
            return;
        }
        catch (Exception e) {
            this.log.error("Unexpected exception caught in producer record handler loop", (Throwable)e);
            return;
        }
        finally {
            this.producerClient.close();
        }
    }

    private void publishStoredRecordsBatch(List<StoredProducerRecord> records) throws InterruptedException {
        ConcurrentLinkedQueue idsToDelete = new ConcurrentLinkedQueue();
        CountDownLatch latch = new CountDownLatch(records.size());
        records.forEach(record -> this.producerClient.send(ProducerUtils.mapFromStoredRecord(record), (metadata, exception) -> {
            latch.countDown();
            if (exception != null) {
                this.log.warn(String.format("Failed to resend failed record to topic %s", record.getTopic()), (Throwable)exception);
            } else {
                idsToDelete.add(record.getId());
            }
        }));
        this.producerClient.getProducer().flush();
        latch.await();
        this.producerRepository.deleteRecords(new ArrayList<Long>(idsToDelete));
    }
}

