/*
 * Decompiled with CFR 0.152.
 */
package one.tomorrow.transactionaloutbox.service;

import jakarta.annotation.PreDestroy;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import one.tomorrow.transactionaloutbox.commons.Longs;
import one.tomorrow.transactionaloutbox.model.OutboxRecord;
import one.tomorrow.transactionaloutbox.repository.OutboxLockRepository;
import one.tomorrow.transactionaloutbox.repository.OutboxRepository;
import one.tomorrow.transactionaloutbox.service.OutboxLockService;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;

public class OutboxProcessor {
    private static final int BATCH_SIZE = 100;
    private static final Logger logger = LoggerFactory.getLogger(OutboxProcessor.class);
    private final OutboxLockService lockService;
    private final String lockOwnerId;
    private final OutboxRepository repository;
    private final KafkaProducerFactory producerFactory;
    private final Duration processingInterval;
    private final ScheduledExecutorService executor;
    private final ScheduledExecutorService cleanupExecutor;
    private final byte[] eventSource;
    private KafkaProducer<String, byte[]> producer;
    private boolean active;
    private Instant lastLockAckquisitionAttempt;
    private ScheduledFuture<?> schedule;
    private ScheduledFuture<?> cleanupSchedule;

    public OutboxProcessor(OutboxRepository repository, KafkaProducerFactory producerFactory, Duration processingInterval, Duration lockTimeout, String lockOwnerId, String eventSource, AutowireCapableBeanFactory beanFactory) {
        this(repository, producerFactory, processingInterval, lockTimeout, lockOwnerId, eventSource, null, beanFactory);
    }

    public OutboxProcessor(OutboxRepository repository, KafkaProducerFactory producerFactory, Duration processingInterval, Duration lockTimeout, String lockOwnerId, String eventSource, CleanupSettings cleanupSettings, AutowireCapableBeanFactory beanFactory) {
        logger.info("Starting outbox processor with lockOwnerId {}, source {} and processing interval {} ms and producer factory {}", new Object[]{lockOwnerId, eventSource, processingInterval.toMillis(), producerFactory});
        this.repository = repository;
        this.processingInterval = processingInterval;
        OutboxLockRepository lockRepository = (OutboxLockRepository)beanFactory.getBean(OutboxLockRepository.class);
        OutboxLockService rawLockService = new OutboxLockService(lockRepository, lockTimeout);
        this.lockService = (OutboxLockService)beanFactory.initializeBean((Object)rawLockService, "OutboxLockService");
        this.lockOwnerId = lockOwnerId;
        this.eventSource = eventSource.getBytes();
        this.producerFactory = producerFactory;
        this.producer = producerFactory.createKafkaProducer();
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.tryLockAcquisition();
        this.cleanupExecutor = cleanupSettings != null ? this.setupCleanupSchedule(repository, cleanupSettings) : null;
    }

    private ScheduledExecutorService setupCleanupSchedule(OutboxRepository repository, CleanupSettings cleanupSettings) {
        ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor();
        this.cleanupSchedule = es.scheduleAtFixedRate(() -> {
            if (this.active) {
                Instant processedBefore = Instant.now().minus(cleanupSettings.getRetention());
                logger.info("Cleaning up outbox records processed before {}", (Object)processedBefore);
                repository.deleteOutboxRecordByProcessedNotNullAndProcessedIsBefore(processedBefore);
            }
        }, 0L, cleanupSettings.getInterval().toMillis(), TimeUnit.MILLISECONDS);
        return es;
    }

    private void scheduleProcessing() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling processing for lockOwnerId {} (executor is shutdown)", (Object)this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::processOutboxWithLock, this.processingInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void scheduleTryLockAcquisition() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling acquisition of outbox lock for lockOwnerId {} (executor is shutdown)", (Object)this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::tryLockAcquisition, this.lockService.getLockTimeout().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @PreDestroy
    public void close() {
        logger.info("Stopping OutboxProcessor with lockOwnerId {}.", (Object)this.lockOwnerId);
        if (this.schedule != null) {
            this.schedule.cancel(false);
        }
        this.executor.shutdown();
        if (this.cleanupSchedule != null) {
            this.cleanupSchedule.cancel(false);
        }
        if (this.cleanupExecutor != null) {
            this.cleanupExecutor.shutdown();
        }
        this.producer.close();
        if (this.active) {
            this.lockService.releaseLock(this.lockOwnerId);
        }
    }

    private void tryLockAcquisition() {
        try {
            boolean originalActive = this.active;
            logger.debug("{} trying to acquire outbox lock", (Object)this.lockOwnerId);
            this.active = this.lockService.acquireOrRefreshLock(this.lockOwnerId);
            this.lastLockAckquisitionAttempt = Instant.now();
            if (this.active) {
                if (originalActive) {
                    logger.debug("{} acquired outbox lock, starting to process outbox", (Object)this.lockOwnerId);
                } else {
                    logger.info("{} acquired outbox lock, starting to process outbox", (Object)this.lockOwnerId);
                }
                this.processOutboxWithLock();
            } else {
                this.scheduleTryLockAcquisition();
            }
        }
        catch (Exception e) {
            logger.warn("Failed trying lock acquisition or processing the outbox, trying again in {}", (Object)this.lockService.getLockTimeout(), (Object)e);
            this.scheduleTryLockAcquisition();
        }
    }

    private void processOutboxWithLock() {
        if (!this.active) {
            throw new IllegalStateException("processOutbox must only be run when in active state");
        }
        if (Instant.now().isAfter(this.lastLockAckquisitionAttempt.plus(this.lockService.getLockTimeout().dividedBy(2L)))) {
            this.tryLockAcquisition();
            return;
        }
        boolean couldRunWithLock = this.tryProcessOutbox();
        if (couldRunWithLock) {
            this.scheduleProcessing();
        } else {
            logger.info("Lock was lost, changing to inactive, now trying to acquire lock in {} ms", (Object)this.lockService.getLockTimeout().toMillis());
            this.active = false;
            this.scheduleTryLockAcquisition();
        }
    }

    private boolean tryProcessOutbox() {
        boolean couldRunWithLock = false;
        try {
            couldRunWithLock = this.lockService.runWithLock(this.lockOwnerId, () -> {
                try {
                    this.processOutbox();
                }
                catch (Throwable e) {
                    logger.warn("Recreating producer, due to failure while processing outbox.", e);
                    this.producer.close();
                    this.producer = this.producerFactory.createKafkaProducer();
                }
            });
        }
        catch (Exception e) {
            logger.warn("Caught exception when trying to run with lock.", (Throwable)e);
        }
        return couldRunWithLock;
    }

    void processOutbox() {
        this.repository.getUnprocessedRecords(100).stream().map(outboxRecord -> this.producer.send(this.toProducerRecord((OutboxRecord)outboxRecord), (metadata, exception) -> {
            if (exception != null) {
                logger.warn("Failed to publish {}", outboxRecord, (Object)exception);
            } else {
                logger.info("Sent record to kafka: {}", outboxRecord);
                this.repository.updateProcessed(outboxRecord.getId(), Instant.now());
            }
        })).toList().forEach(OutboxProcessor::await);
    }

    private static void await(Future<?> future) {
        try {
            future.get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private ProducerRecord<String, byte[]> toProducerRecord(OutboxRecord outboxRecord) {
        ProducerRecord producerRecord = new ProducerRecord(outboxRecord.getTopic(), (Object)outboxRecord.getKey(), (Object)outboxRecord.getValue());
        if (outboxRecord.getHeaders() != null) {
            outboxRecord.getHeaders().forEach((k, v) -> producerRecord.headers().add(k, v.getBytes()));
        }
        producerRecord.headers().add("x-sequence", Longs.toByteArray((long)outboxRecord.getId()));
        producerRecord.headers().add("x-source", this.eventSource);
        return producerRecord;
    }

    @Generated
    public boolean isActive() {
        return this.active;
    }

    @FunctionalInterface
    public static interface KafkaProducerFactory {
        public KafkaProducer<String, byte[]> createKafkaProducer();
    }

    public static final class CleanupSettings {
        private final Duration interval;
        private final Duration retention;

        @Generated
        CleanupSettings(Duration interval, Duration retention) {
            this.interval = interval;
            this.retention = retention;
        }

        @Generated
        public static CleanupSettingsBuilder builder() {
            return new CleanupSettingsBuilder();
        }

        @Generated
        public Duration getInterval() {
            return this.interval;
        }

        @Generated
        public Duration getRetention() {
            return this.retention;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof CleanupSettings)) {
                return false;
            }
            CleanupSettings other = (CleanupSettings)o;
            Duration this$interval = this.getInterval();
            Duration other$interval = other.getInterval();
            if (this$interval == null ? other$interval != null : !((Object)this$interval).equals(other$interval)) {
                return false;
            }
            Duration this$retention = this.getRetention();
            Duration other$retention = other.getRetention();
            return !(this$retention == null ? other$retention != null : !((Object)this$retention).equals(other$retention));
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Duration $interval = this.getInterval();
            result = result * 59 + ($interval == null ? 43 : ((Object)$interval).hashCode());
            Duration $retention = this.getRetention();
            result = result * 59 + ($retention == null ? 43 : ((Object)$retention).hashCode());
            return result;
        }

        @Generated
        public String toString() {
            return "OutboxProcessor.CleanupSettings(interval=" + this.getInterval() + ", retention=" + this.getRetention() + ")";
        }

        @Generated
        public static class CleanupSettingsBuilder {
            @Generated
            private Duration interval;
            @Generated
            private Duration retention;

            @Generated
            CleanupSettingsBuilder() {
            }

            @Generated
            public CleanupSettingsBuilder interval(Duration interval) {
                this.interval = interval;
                return this;
            }

            @Generated
            public CleanupSettingsBuilder retention(Duration retention) {
                this.retention = retention;
                return this;
            }

            @Generated
            public CleanupSettings build() {
                return new CleanupSettings(this.interval, this.retention);
            }

            @Generated
            public String toString() {
                return "OutboxProcessor.CleanupSettings.CleanupSettingsBuilder(interval=" + this.interval + ", retention=" + this.retention + ")";
            }
        }
    }
}

