/*
 * Decompiled with CFR 0.152.
 */
package one.tomorrow.transactionaloutbox.reactive.service;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.PreDestroy;
import one.tomorrow.transactionaloutbox.commons.Longs;
import one.tomorrow.transactionaloutbox.reactive.model.OutboxRecord;
import one.tomorrow.transactionaloutbox.reactive.repository.OutboxRepository;
import one.tomorrow.transactionaloutbox.reactive.service.OutboxLockService;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class OutboxProcessor {
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final Logger logger = LoggerFactory.getLogger(OutboxProcessor.class);
    private final List<Consumer<KafkaProducer<String, byte[]>>> producerClosedListeners = new ArrayList<Consumer<KafkaProducer<String, byte[]>>>();
    private final List<Consumer<KafkaProducer<String, byte[]>>> producerCreatedListeners = new ArrayList<Consumer<KafkaProducer<String, byte[]>>>();
    private final OutboxLockService lockService;
    private final String lockOwnerId;
    private final OutboxRepository repository;
    private final KafkaProducerFactory producerFactory;
    private final Duration processingInterval;
    private final Duration lockTimeout;
    private final ScheduledExecutorService executor;
    private final byte[] eventSource;
    private final int batchSize;
    private KafkaProducer<String, byte[]> producer;
    private boolean active;
    private Instant lastLockAckquisitionAttempt;
    private ScheduledFuture<?> schedule;

    public OutboxProcessor(OutboxRepository repository, OutboxLockService lockService, KafkaProducerFactory producerFactory, Duration processingInterval, Duration lockTimeout, String lockOwnerId, String eventSource) {
        this(repository, lockService, producerFactory, processingInterval, lockTimeout, lockOwnerId, eventSource, 100);
    }

    public OutboxProcessor(OutboxRepository repository, OutboxLockService lockService, KafkaProducerFactory producerFactory, Duration processingInterval, Duration lockTimeout, String lockOwnerId, String eventSource, int batchSize) {
        logger.info("Starting outbox processor with lockOwnerId {}, source {} and processing interval {} ms and producer factory {}", new Object[]{lockOwnerId, eventSource, processingInterval.toMillis(), producerFactory});
        this.repository = repository;
        this.processingInterval = processingInterval;
        this.lockTimeout = lockTimeout;
        this.lockService = lockService;
        this.lockOwnerId = lockOwnerId;
        this.eventSource = eventSource.getBytes();
        this.batchSize = batchSize;
        this.producerFactory = producerFactory;
        this.createProducer(producerFactory);
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.tryLockAcquisition();
    }

    public OutboxProcessor onBeforeProducerClosed(Consumer<KafkaProducer<String, byte[]>> listener) {
        this.producerClosedListeners.add(listener);
        return this;
    }

    public OutboxProcessor onProducerCreated(Consumer<KafkaProducer<String, byte[]>> listener) {
        this.producerCreatedListeners.add(listener);
        if (this.producer != null) {
            listener.accept(this.producer);
        }
        return this;
    }

    private void createProducer(KafkaProducerFactory producerFactory) {
        this.producer = producerFactory.createKafkaProducer();
        this.producerCreatedListeners.forEach(listener -> listener.accept(this.producer));
    }

    private void closeProducer() {
        this.producerClosedListeners.forEach(listener -> listener.accept(this.producer));
        this.producer.close(Duration.ZERO);
    }

    private void scheduleProcessing() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling processing for lockOwnerId {} (executor is shutdown)", (Object)this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::processOutboxWithLock, this.processingInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void scheduleTryLockAcquisition() {
        if (this.executor.isShutdown()) {
            logger.info("Not scheduling acquisition of outbox lock for lockOwnerId {} (executor is shutdown)", (Object)this.lockOwnerId);
        } else {
            this.schedule = this.executor.schedule(this::tryLockAcquisition, this.lockTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @PreDestroy
    public void close() {
        logger.info("Stopping OutboxProcessor.");
        if (this.schedule != null) {
            this.schedule.cancel(false);
        }
        this.executor.shutdown();
        this.closeProducer();
        this.lockService.releaseLock(this.lockOwnerId).subscribe();
    }

    private void tryLockAcquisition() {
        boolean originalActive = this.active;
        logger.debug("{} trying to acquire outbox lock", (Object)this.lockOwnerId);
        this.lockService.acquireOrRefreshLock(this.lockOwnerId, this.lockTimeout, this.active).doOnNext(acquiredOrRefreshedLock -> {
            this.active = acquiredOrRefreshedLock;
            this.lastLockAckquisitionAttempt = Instant.now();
            if (acquiredOrRefreshedLock.booleanValue()) {
                if (originalActive) {
                    logger.debug("{} acquired outbox lock, starting to process outbox", (Object)this.lockOwnerId);
                } else {
                    logger.info("{} acquired outbox lock, starting to process outbox", (Object)this.lockOwnerId);
                }
                this.processOutboxWithLock();
            } else {
                this.scheduleTryLockAcquisition();
            }
        }).doOnError(e -> {
            logger.warn("Failed trying to acquire outbox lock, trying again in {}", (Object)this.lockTimeout, e);
            this.scheduleTryLockAcquisition();
        }).subscribe();
    }

    private void processOutboxWithLock() {
        if (!this.active) {
            logger.warn("processOutbox must only be run when in active state");
            this.scheduleTryLockAcquisition();
            return;
        }
        if (Instant.now().isAfter(this.lastLockAckquisitionAttempt.plus(this.lockTimeout.dividedBy(2L)))) {
            this.tryLockAcquisition();
            return;
        }
        this.lockService.runWithLock(this.lockOwnerId, (Mono<Void>)Mono.defer(() -> this.processOutbox().onErrorResume(e -> {
            logger.warn("Recreating producer, due to failure while processing outbox.", e);
            this.closeProducer();
            this.createProducer(this.producerFactory);
            return Mono.empty();
        }).then())).onErrorResume(e -> {
            logger.warn("Failed to run with lock, trying to acquire lock in {} ms", (Object)this.lockTimeout.toMillis(), e);
            this.active = false;
            this.scheduleTryLockAcquisition();
            return Mono.empty();
        }).doOnNext(couldRunWithLock -> {
            if (couldRunWithLock.booleanValue()) {
                this.scheduleProcessing();
            } else {
                logger.info("Lock was lost, changing to inactive, now trying to acquire lock in {} ms", (Object)this.lockTimeout.toMillis());
                this.active = false;
                this.scheduleTryLockAcquisition();
            }
        }).subscribe();
    }

    private Mono<List<OutboxRecord>> processOutbox() {
        return this.repository.getUnprocessedRecords(this.batchSize).flatMap(this::publish).concatMap(outboxRecord -> this.repository.saveInNewTransaction(outboxRecord.toBuilder().processed(Instant.now()).build())).collectList();
    }

    private Mono<OutboxRecord> publish(OutboxRecord outboxRecord) {
        return Mono.create(monoSink -> {
            ProducerRecord<String, byte[]> producerRecord = this.toProducerRecord(outboxRecord);
            this.producer.send(producerRecord, (metadata, exception) -> {
                if (exception != null) {
                    monoSink.error((Throwable)exception);
                } else {
                    logger.info("Sent record to kafka: {} (got metadata: {})", (Object)outboxRecord, (Object)metadata);
                    monoSink.success((Object)outboxRecord);
                }
            });
        });
    }

    private ProducerRecord<String, byte[]> toProducerRecord(OutboxRecord outboxRecord) {
        ProducerRecord producerRecord = new ProducerRecord(outboxRecord.getTopic(), (Object)outboxRecord.getKey(), (Object)outboxRecord.getValue());
        Map<String, String> headers = outboxRecord.getHeadersAsMap();
        if (headers != null) {
            headers.forEach((k, v) -> producerRecord.headers().add(k, v.getBytes()));
        }
        producerRecord.headers().add("x-sequence", Longs.toByteArray((long)outboxRecord.getId()));
        producerRecord.headers().add("x-source", this.eventSource);
        return producerRecord;
    }

    @FunctionalInterface
    public static interface KafkaProducerFactory {
        public KafkaProducer<String, byte[]> createKafkaProducer();
    }
}

