/*
 * Decompiled with CFR 0.152.
 */
package one.tomorrow.transactionaloutbox.reactive.service;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.PreDestroy;
import lombok.Generated;
import one.tomorrow.transactionaloutbox.commons.Longs;
import one.tomorrow.transactionaloutbox.reactive.model.OutboxRecord;
import one.tomorrow.transactionaloutbox.reactive.repository.OutboxRepository;
import one.tomorrow.transactionaloutbox.reactive.service.OutboxLockService;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class OutboxProcessor {
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final Logger logger = LoggerFactory.getLogger(OutboxProcessor.class);
    private final List<Consumer<KafkaProducer<String, byte[]>>> producerClosedListeners = new ArrayList<Consumer<KafkaProducer<String, byte[]>>>();
    private final List<Consumer<KafkaProducer<String, byte[]>>> producerCreatedListeners = new ArrayList<Consumer<KafkaProducer<String, byte[]>>>();
    private final OutboxLockService lockService;
    private final String lockOwnerId;
    private final OutboxRepository repository;
    private final KafkaProducerFactory producerFactory;
    private final Duration processingInterval;
    private final Duration lockTimeout;
    private final ScheduledExecutorService executor;
    private final ScheduledExecutorService cleanupExecutor;
    private final byte[] eventSource;
    private final int batchSize;
    private KafkaProducer<String, byte[]> producer;
    private boolean active;
    private Instant lastLockAckquisitionAttempt;
    private ScheduledFuture<?> schedule;
    private ScheduledFuture<?> cleanupSchedule;

    public OutboxProcessor(OutboxRepository repository, OutboxLockService lockService, KafkaProducerFactory producerFactory, Duration processingInterval, Duration lockTimeout, String lockOwnerId, String eventSource) {
        this(repository, lockService, producerFactory, processingInterval, lockTimeout, lockOwnerId, eventSource, 100, null);
    }

    public OutboxProcessor(OutboxRepository repository, OutboxLockService lockService, KafkaProducerFactory producerFactory, Duration processingInterval, Duration lockTimeout, String lockOwnerId, String eventSource, CleanupSettings cleanupSettings) {
        this(repository, lockService, producerFactory, processingInterval, lockTimeout, lockOwnerId, eventSource, 100, cleanupSettings);
    }

    public OutboxProcessor(OutboxRepository repository, OutboxLockService lockService, KafkaProducerFactory producerFactory, Duration processingInterval, Duration lockTimeout, String lockOwnerId, String eventSource, int batchSize, CleanupSettings cleanupSettings) {
        logger.info("Starting outbox processor with lockOwnerId {}, source {} and processing interval {} ms and producer factory {}", new Object[]{lockOwnerId, eventSource, processingInterval.toMillis(), producerFactory});
        this.repository = repository;
        this.processingInterval = processingInterval;
        this.lockTimeout = lockTimeout;
        this.lockService = lockService;
        this.lockOwnerId = lockOwnerId;
        this.eventSource = eventSource.getBytes();
        this.batchSize = batchSize;
        this.producerFactory = producerFactory;
        this.createProducer(producerFactory);
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.tryLockAcquisition();
        this.cleanupExecutor = cleanupSettings != null ? this.setupCleanupSchedule(repository, cleanupSettings) : null;
    }

    public OutboxProcessor onBeforeProducerClosed(Consumer<KafkaProducer<String, byte[]>> listener) {
        this.producerClosedListeners.add(listener);
        return this;
    }

    public OutboxProcessor onProducerCreated(Consumer<KafkaProducer<String, byte[]>> listener) {
        this.producerCreatedListeners.add(listener);
        if (this.producer != null) {
            listener.accept(this.producer);
        }
        return this;
    }

    private void createProducer(KafkaProducerFactory producerFactory) {
        this.producer = producerFactory.createKafkaProducer();
        this.producerCreatedListeners.forEach(listener -> listener.accept(this.producer));
    }

    private void closeProducer() {
        this.producerClosedListeners.forEach(listener -> listener.accept(this.producer));
        this.producer.close(Duration.ZERO);
    }

    private ScheduledExecutorService setupCleanupSchedule(OutboxRepository repository, CleanupSettings cleanupSettings) {
        ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor();
        this.cleanupSchedule = es.scheduleAtFixedRate(() -> {
            if (this.active) {
                Instant processedBefore = Instant.now().minus(cleanupSettings.getRetention());
                logger.info("Cleaning up outbox records processed before {}", (Object)processedBefore);
                repository.deleteOutboxRecordByProcessedNotNullAndProcessedIsBefore(processedBefore).block();
            }
        }, 0L, cleanupSettings.getInterval().toMillis(), TimeUnit.MILLISECONDS);
        return es;
    }

    private void scheduleProcessing() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling processing for lockOwnerId {} (executor is shutdown)", (Object)this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::processOutboxWithLock, this.processingInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void scheduleTryLockAcquisition() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling acquisition of outbox lock for lockOwnerId {} (executor is shutdown)", (Object)this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::tryLockAcquisition, this.lockTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @PreDestroy
    public void close() {
        logger.info("Stopping OutboxProcessor.");
        if (this.schedule != null) {
            this.schedule.cancel(false);
        }
        this.executor.shutdown();
        if (this.cleanupSchedule != null) {
            this.cleanupSchedule.cancel(false);
        }
        if (this.cleanupExecutor != null) {
            this.cleanupExecutor.shutdown();
        }
        this.closeProducer();
        this.lockService.releaseLock(this.lockOwnerId).subscribe();
    }

    private void tryLockAcquisition() {
        boolean originalActive = this.active;
        logger.debug("{} trying to acquire outbox lock", (Object)this.lockOwnerId);
        this.lockService.acquireOrRefreshLock(this.lockOwnerId, this.lockTimeout, this.active).doOnNext(acquiredOrRefreshedLock -> {
            this.active = acquiredOrRefreshedLock;
            this.lastLockAckquisitionAttempt = Instant.now();
            if (acquiredOrRefreshedLock.booleanValue()) {
                if (originalActive) {
                    logger.debug("{} acquired outbox lock, starting to process outbox", (Object)this.lockOwnerId);
                } else {
                    logger.info("{} acquired outbox lock, starting to process outbox", (Object)this.lockOwnerId);
                }
                this.processOutboxWithLock();
            } else {
                this.scheduleTryLockAcquisition();
            }
        }).doOnError(e -> {
            logger.warn("Failed trying to acquire outbox lock, trying again in {}", (Object)this.lockTimeout, e);
            this.scheduleTryLockAcquisition();
        }).subscribe();
    }

    private void processOutboxWithLock() {
        if (!this.active) {
            logger.warn("processOutbox must only be run when in active state");
            this.scheduleTryLockAcquisition();
            return;
        }
        if (Instant.now().isAfter(this.lastLockAckquisitionAttempt.plus(this.lockTimeout.dividedBy(2L)))) {
            this.tryLockAcquisition();
            return;
        }
        this.lockService.runWithLock(this.lockOwnerId, (Mono<Void>)Mono.defer(() -> this.processOutbox().onErrorResume(e -> {
            logger.warn("Recreating producer, due to failure while processing outbox.", e);
            this.closeProducer();
            this.createProducer(this.producerFactory);
            return Mono.empty();
        }).then())).onErrorResume(e -> {
            logger.warn("Failed to run with lock, trying to acquire lock in {} ms", (Object)this.lockTimeout.toMillis(), e);
            this.active = false;
            this.scheduleTryLockAcquisition();
            return Mono.empty();
        }).doOnNext(couldRunWithLock -> {
            if (couldRunWithLock.booleanValue()) {
                this.scheduleProcessing();
            } else {
                logger.info("Lock was lost, changing to inactive, now trying to acquire lock in {} ms", (Object)this.lockTimeout.toMillis());
                this.active = false;
                this.scheduleTryLockAcquisition();
            }
        }).subscribe();
    }

    private Mono<List<OutboxRecord>> processOutbox() {
        return this.repository.getUnprocessedRecords(this.batchSize).flatMap(this::publish).concatMap(outboxRecord -> this.repository.saveInNewTransaction(outboxRecord.toBuilder().processed(Instant.now()).build())).collectList();
    }

    private Mono<OutboxRecord> publish(OutboxRecord outboxRecord) {
        return Mono.create(monoSink -> {
            ProducerRecord<String, byte[]> producerRecord = this.toProducerRecord(outboxRecord);
            this.producer.send(producerRecord, (metadata, exception) -> {
                if (exception != null) {
                    monoSink.error((Throwable)exception);
                } else {
                    logger.info("Sent record to kafka: {} (got metadata: {})", (Object)outboxRecord, (Object)metadata);
                    monoSink.success((Object)outboxRecord);
                }
            });
        });
    }

    private ProducerRecord<String, byte[]> toProducerRecord(OutboxRecord outboxRecord) {
        ProducerRecord producerRecord = new ProducerRecord(outboxRecord.getTopic(), (Object)outboxRecord.getKey(), (Object)outboxRecord.getValue());
        Map<String, String> headers = outboxRecord.getHeadersAsMap();
        if (headers != null) {
            headers.forEach((k, v) -> producerRecord.headers().add(k, v.getBytes()));
        }
        producerRecord.headers().add("x-sequence", Longs.toByteArray((long)outboxRecord.getId()));
        producerRecord.headers().add("x-source", this.eventSource);
        return producerRecord;
    }

    @Generated
    public boolean isActive() {
        return this.active;
    }

    @FunctionalInterface
    public static interface KafkaProducerFactory {
        public KafkaProducer<String, byte[]> createKafkaProducer();
    }

    public static final class CleanupSettings {
        private final Duration interval;
        private final Duration retention;

        @Generated
        CleanupSettings(Duration interval, Duration retention) {
            this.interval = interval;
            this.retention = retention;
        }

        @Generated
        public static CleanupSettingsBuilder builder() {
            return new CleanupSettingsBuilder();
        }

        @Generated
        public Duration getInterval() {
            return this.interval;
        }

        @Generated
        public Duration getRetention() {
            return this.retention;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof CleanupSettings)) {
                return false;
            }
            CleanupSettings other = (CleanupSettings)o;
            Duration this$interval = this.getInterval();
            Duration other$interval = other.getInterval();
            if (this$interval == null ? other$interval != null : !((Object)this$interval).equals(other$interval)) {
                return false;
            }
            Duration this$retention = this.getRetention();
            Duration other$retention = other.getRetention();
            return !(this$retention == null ? other$retention != null : !((Object)this$retention).equals(other$retention));
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Duration $interval = this.getInterval();
            result = result * 59 + ($interval == null ? 43 : ((Object)$interval).hashCode());
            Duration $retention = this.getRetention();
            result = result * 59 + ($retention == null ? 43 : ((Object)$retention).hashCode());
            return result;
        }

        @Generated
        public String toString() {
            return "OutboxProcessor.CleanupSettings(interval=" + this.getInterval() + ", retention=" + this.getRetention() + ")";
        }

        @Generated
        public static class CleanupSettingsBuilder {
            @Generated
            private Duration interval;
            @Generated
            private Duration retention;

            @Generated
            CleanupSettingsBuilder() {
            }

            @Generated
            public CleanupSettingsBuilder interval(Duration interval) {
                this.interval = interval;
                return this;
            }

            @Generated
            public CleanupSettingsBuilder retention(Duration retention) {
                this.retention = retention;
                return this;
            }

            @Generated
            public CleanupSettings build() {
                return new CleanupSettings(this.interval, this.retention);
            }

            @Generated
            public String toString() {
                return "OutboxProcessor.CleanupSettings.CleanupSettingsBuilder(interval=" + this.interval + ", retention=" + this.retention + ")";
            }
        }
    }
}

