/*
 * Decompiled with CFR 0.152.
 */
package one.tomorrow.transactionaloutbox.reactive.repository;

import io.r2dbc.spi.Row;
import java.time.Duration;
import java.time.Instant;
import lombok.Generated;
import one.tomorrow.transactionaloutbox.reactive.model.OutboxLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Mono;

@Repository
public class OutboxLockRepository {
    private static final Logger logger = LoggerFactory.getLogger(OutboxLockRepository.class);
    private final DatabaseClient db;
    private final TransactionalOperator rxtx;

    public Mono<Boolean> acquireOrRefreshLock(String ownerId, Duration timeout, boolean refreshLock) {
        return ((Mono)this.selectOutboxLock(refreshLock).flatMap(lock -> this.handleExistingLock((OutboxLock)lock, refreshLock, ownerId, timeout)).switchIfEmpty(this.insertOutboxLock(ownerId, timeout)).as(arg_0 -> ((TransactionalOperator)this.rxtx).transactional(arg_0))).onErrorResume(DataIntegrityViolationException.class, e -> this.handleDuplicateKey((DataIntegrityViolationException)e, ownerId)).onErrorResume(e -> e instanceof DataAccessResourceFailureException && e.toString().contains("could not obtain lock"), e -> this.handleRowIsLocked((Throwable)e, ownerId));
    }

    private Mono<Boolean> handleDuplicateKey(DataIntegrityViolationException e, String ownerId) {
        logger.info("Outbox lock for owner {} could not be created, another one has been created concurrently: {}", (Object)ownerId, (Object)e);
        return Mono.just((Object)false);
    }

    private Mono<Boolean> handleRowIsLocked(Throwable e, String ownerId) {
        logger.info("Could not grab lock for owner {} - database row is locked: {}", (Object)ownerId, (Object)e.toString());
        return Mono.just((Object)false);
    }

    private Mono<OutboxLock> selectOutboxLock(boolean forUpdate) {
        String sql = "select * from outbox_kafka_lock where id = :id" + (forUpdate ? " FOR UPDATE NOWAIT" : "");
        return this.db.sql(sql).bind("id", (Object)"outboxLock").map(this::toOutboxLock).one();
    }

    private Mono<Boolean> insertOutboxLock(String ownerId, Duration timeout) {
        return Mono.defer(() -> {
            logger.debug("No outbox lock found. Creating one for {}", (Object)ownerId);
            return this.db.sql("insert into outbox_kafka_lock (id, owner_id, valid_until) values (:id, :ownerId, :validUntil)").bind("id", (Object)"outboxLock").bind("ownerId", (Object)ownerId).bind("validUntil", (Object)Instant.now().plus(timeout)).fetch().rowsUpdated().map(rowsUpdated -> rowsUpdated > 0);
        });
    }

    private Mono<Boolean> handleExistingLock(OutboxLock lock, boolean selectedForUpdate, String ownerId, Duration timeout) {
        if (this.isForeignValidLock(lock, ownerId)) {
            logger.debug("Found outbox lock with owner {}, valid until {} (now: {})", new Object[]{lock.getOwnerId(), lock.getValidUntil(), Instant.now()});
            return Mono.just((Object)false);
        }
        if (!selectedForUpdate) {
            return this.selectOutboxLock(true).flatMap(lockForUpdate -> this.handleExistingLock((OutboxLock)lockForUpdate, true, ownerId, timeout));
        }
        if (ownerId.equals(lock.getOwnerId())) {
            logger.debug("Found outbox lock with requested owner {}, valid until {} - updating lock", (Object)lock.getOwnerId(), (Object)lock.getValidUntil());
            return this.db.sql("update outbox_kafka_lock set valid_until = :validUntil where id = :id and owner_id = :ownerId").bind("validUntil", (Object)Instant.now().plus(timeout)).bind("id", (Object)"outboxLock").bind("ownerId", (Object)ownerId).fetch().rowsUpdated().map(rowsUpdated -> rowsUpdated > 0);
        }
        logger.info("Found expired outbox lock with owner {}, which was valid until {} - grabbing lock for {}", new Object[]{lock.getOwnerId(), lock.getValidUntil(), ownerId});
        return this.db.sql("update outbox_kafka_lock set owner_id = :ownerId, valid_until = :validUntil where id = :id").bind("ownerId", (Object)ownerId).bind("validUntil", (Object)Instant.now().plus(timeout)).bind("id", (Object)"outboxLock").fetch().rowsUpdated().map(rowsUpdated -> rowsUpdated > 0);
    }

    private boolean isForeignValidLock(OutboxLock lock, String ownerId) {
        return !ownerId.equals(lock.getOwnerId()) && lock.getValidUntil().isAfter(Instant.now());
    }

    private OutboxLock toOutboxLock(Row row) {
        return new OutboxLock((String)row.get("owner_id", String.class), (Instant)row.get("valid_until", Instant.class));
    }

    public Mono<Boolean> preventLockStealing(String ownerId) {
        return this.lockOutboxLock(ownerId).map(existingLock -> true).defaultIfEmpty((Object)false);
    }

    private Mono<OutboxLock> lockOutboxLock(String ownerId) {
        return this.db.sql("select * from outbox_kafka_lock where owner_id = :ownerId for update").bind("ownerId", (Object)ownerId).map(this::toOutboxLock).one();
    }

    public Mono<Void> releaseLock(String ownerId) {
        return this.db.sql("delete from outbox_kafka_lock where owner_id = :ownerId").bind("ownerId", (Object)ownerId).fetch().rowsUpdated().doOnNext(rowsUpdated -> {
            if (rowsUpdated > 0) {
                logger.info("Released outbox lock for owner {}", (Object)ownerId);
            } else {
                logger.info("Outbox lock for owner {} not found, nothing released.", (Object)ownerId);
            }
        }).then();
    }

    @Generated
    public OutboxLockRepository(DatabaseClient db, TransactionalOperator rxtx) {
        this.db = db;
        this.rxtx = rxtx;
    }
}

