/*
 * Decompiled with CFR 0.152.
 */
package one.tomorrow.transactionaloutbox.service;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import one.tomorrow.transactionaloutbox.commons.Longs;
import one.tomorrow.transactionaloutbox.model.OutboxRecord;
import one.tomorrow.transactionaloutbox.repository.OutboxLockRepository;
import one.tomorrow.transactionaloutbox.repository.OutboxRepository;
import one.tomorrow.transactionaloutbox.service.OutboxLockService;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;

public class OutboxProcessor {
    private static final int BATCH_SIZE = 100;
    private static final Logger logger = LoggerFactory.getLogger(OutboxProcessor.class);
    private final OutboxLockService lockService;
    private final String lockOwnerId;
    private final OutboxRepository repository;
    private final KafkaProducerFactory producerFactory;
    private final Duration processingInterval;
    private final ScheduledExecutorService executor;
    private final byte[] eventSource;
    private KafkaProducer<String, byte[]> producer;
    private boolean active;
    private Instant lastLockAckquisitionAttempt;
    private ScheduledFuture<?> schedule;

    public OutboxProcessor(OutboxRepository repository, KafkaProducerFactory producerFactory, Duration processingInterval, Duration lockTimeout, String lockOwnerId, String eventSource, AutowireCapableBeanFactory beanFactory) {
        logger.info("Starting outbox processor with lockOwnerId {}, source {} and processing interval {} ms and producer factory {}", new Object[]{lockOwnerId, eventSource, processingInterval.toMillis(), producerFactory});
        this.repository = repository;
        this.processingInterval = processingInterval;
        OutboxLockRepository lockRepository = (OutboxLockRepository)beanFactory.getBean(OutboxLockRepository.class);
        OutboxLockService rawLockService = new OutboxLockService(lockRepository, lockTimeout);
        this.lockService = (OutboxLockService)beanFactory.applyBeanPostProcessorsAfterInitialization((Object)rawLockService, "OutboxLockService");
        this.lockOwnerId = lockOwnerId;
        this.eventSource = eventSource.getBytes();
        this.producerFactory = producerFactory;
        this.producer = producerFactory.createKafkaProducer();
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.tryLockAcquisition();
    }

    private void scheduleProcessing() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling processing for lockOwnerId {} (executor is shutdown)", (Object)this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::processOutboxWithLock, this.processingInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void scheduleTryLockAcquisition() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling acquisition of outbox lock for lockOwnerId {} (executor is shutdown)", (Object)this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::tryLockAcquisition, this.lockService.getLockTimeout().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @PreDestroy
    public void close() {
        logger.info("Stopping OutboxProcessor.");
        if (this.schedule != null) {
            this.schedule.cancel(false);
        }
        this.executor.shutdown();
        this.producer.close();
        if (this.active) {
            this.lockService.releaseLock(this.lockOwnerId);
        }
    }

    private void tryLockAcquisition() {
        try {
            boolean originalActive = this.active;
            logger.debug("{} trying to acquire outbox lock", (Object)this.lockOwnerId);
            this.active = this.lockService.acquireOrRefreshLock(this.lockOwnerId);
            this.lastLockAckquisitionAttempt = Instant.now();
            if (this.active) {
                if (originalActive) {
                    logger.debug("{} acquired outbox lock, starting to process outbox", (Object)this.lockOwnerId);
                } else {
                    logger.info("{} acquired outbox lock, starting to process outbox", (Object)this.lockOwnerId);
                }
                this.processOutboxWithLock();
            } else {
                this.scheduleTryLockAcquisition();
            }
        }
        catch (Exception e) {
            logger.warn("Failed trying lock acquisition or processing the outbox, trying again in {}", (Object)this.lockService.getLockTimeout(), (Object)e);
            this.scheduleTryLockAcquisition();
        }
    }

    private void processOutboxWithLock() {
        if (!this.active) {
            throw new IllegalStateException("processOutbox must only be run when in active state");
        }
        if (Instant.now().isAfter(this.lastLockAckquisitionAttempt.plus(this.lockService.getLockTimeout().dividedBy(2L)))) {
            this.tryLockAcquisition();
            return;
        }
        boolean couldRunWithLock = this.lockService.runWithLock(this.lockOwnerId, () -> {
            try {
                this.processOutbox();
            }
            catch (Throwable e) {
                logger.warn("Recreating producer, due to failure while processing outbox.", e);
                this.producer.close();
                this.producer = this.producerFactory.createKafkaProducer();
            }
        });
        if (couldRunWithLock) {
            this.scheduleProcessing();
        } else {
            logger.info("Lock was lost, changing to inactive, now trying to acquire lock in {} ms", (Object)this.lockService.getLockTimeout().toMillis());
            this.active = false;
            this.scheduleTryLockAcquisition();
        }
    }

    private void processOutbox() throws ExecutionException, InterruptedException {
        List<OutboxRecord> records = this.repository.getUnprocessedRecords(100);
        for (OutboxRecord outboxRecord : records) {
            ProducerRecord<String, byte[]> producerRecord = this.toProducerRecord(outboxRecord);
            Future result = this.producer.send(producerRecord);
            result.get();
            logger.info("Sent record to kafka: {}", (Object)outboxRecord);
            outboxRecord.setProcessed(Instant.now());
            this.repository.update(outboxRecord);
        }
    }

    private ProducerRecord<String, byte[]> toProducerRecord(OutboxRecord outboxRecord) {
        ProducerRecord producerRecord = new ProducerRecord(outboxRecord.getTopic(), (Object)outboxRecord.getKey(), (Object)outboxRecord.getValue());
        if (outboxRecord.getHeaders() != null) {
            outboxRecord.getHeaders().forEach((k, v) -> producerRecord.headers().add(k, v.getBytes()));
        }
        producerRecord.headers().add("x-sequence", Longs.toByteArray((long)outboxRecord.getId()));
        producerRecord.headers().add("x-source", this.eventSource);
        return producerRecord;
    }

    @FunctionalInterface
    public static interface KafkaProducerFactory {
        public KafkaProducer<String, byte[]> createKafkaProducer();
    }
}

