/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.transaction.support;

import io.micronaut.context.annotation.Parameter;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.async.propagation.ReactorPropagation;
import io.micronaut.core.propagation.PropagatedContextElement;
import io.micronaut.data.connection.ConnectionStatus;
import io.micronaut.data.connection.reactive.ReactiveConnectionStatus;
import io.micronaut.data.connection.reactive.ReactiveConnectionSynchronization;
import io.micronaut.data.connection.reactive.ReactorConnectionOperations;
import io.micronaut.transaction.TransactionDefinition;
import io.micronaut.transaction.exceptions.NoTransactionException;
import io.micronaut.transaction.exceptions.TransactionSystemException;
import io.micronaut.transaction.exceptions.TransactionUsageException;
import io.micronaut.transaction.reactive.ReactiveTransactionOperations;
import io.micronaut.transaction.reactive.ReactiveTransactionStatus;
import io.micronaut.transaction.reactive.ReactorReactiveTransactionOperations;
import io.micronaut.transaction.support.SyncCompleteAndErrorPublisher;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

@Internal
public abstract class AbstractReactorTransactionOperations<C>
implements ReactorReactiveTransactionOperations<C> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractReactorTransactionOperations.class);
    private final ReactorConnectionOperations<C> connectionOperations;

    protected AbstractReactorTransactionOperations(@Parameter ReactorConnectionOperations<C> connectionOperations) {
        this.connectionOperations = connectionOperations;
    }

    @NonNull
    protected abstract Publisher<Void> beginTransaction(@NonNull ConnectionStatus<C> var1, @NonNull TransactionDefinition var2);

    @NonNull
    protected abstract Publisher<Void> commitTransaction(@NonNull ConnectionStatus<C> var1, @NonNull TransactionDefinition var2);

    @NonNull
    protected abstract Publisher<Void> rollbackTransaction(@NonNull ConnectionStatus<C> var1, @NonNull TransactionDefinition var2);

    @Override
    public final Optional<ReactiveTransactionStatus<C>> findTransactionStatus(ContextView contextView) {
        return ReactorPropagation.findAllContextElements((ContextView)contextView, ReactiveTransactionPropagatedContext.class).filter(e -> e.transactionOperations == this).map(e -> e.status).findFirst();
    }

    @Override
    public final ReactiveTransactionStatus<C> getTransactionStatus(ContextView contextView) {
        return this.findTransactionStatus(contextView).orElse(null);
    }

    @Override
    public final TransactionDefinition getTransactionDefinition(ContextView contextView) {
        ReactiveTransactionStatus<C> status = this.getTransactionStatus(contextView);
        return status == null ? null : status.getTransactionDefinition();
    }

    @Override
    @NonNull
    public final <T> Flux<T> withTransaction(@NonNull TransactionDefinition definition, @NonNull ReactiveTransactionOperations.TransactionalCallback<C, T> handler) {
        Objects.requireNonNull(definition, "Transaction definition cannot be null");
        Objects.requireNonNull(handler, "Callback handler cannot be null");
        return Flux.deferContextual(contextView -> {
            ReactiveTransactionStatus<C> transactionStatus = this.getTransactionStatus((ContextView)contextView);
            return this.withTransactionFlux(transactionStatus, definition, handler);
        });
    }

    protected <T> Flux<T> withTransactionFlux(ReactiveTransactionStatus<C> transactionStatus, TransactionDefinition definition, ReactiveTransactionOperations.TransactionalCallback<C, T> handler) {
        TransactionDefinition.Propagation propagationBehavior = definition.getPropagationBehavior();
        if (transactionStatus != null) {
            if (propagationBehavior == TransactionDefinition.Propagation.NOT_SUPPORTED || propagationBehavior == TransactionDefinition.Propagation.NEVER) {
                return Flux.error((Throwable)this.propagationNotSupported(propagationBehavior));
            }
            if (propagationBehavior == TransactionDefinition.Propagation.REQUIRES_NEW) {
                return this.openNewConnectionAndTx(definition, handler);
            }
            return this.executeCallbackFlux(this.existingTransaction(transactionStatus, definition), handler);
        }
        if (propagationBehavior == TransactionDefinition.Propagation.MANDATORY) {
            return Flux.error((Throwable)this.expectedTransaction());
        }
        return this.openNewConnectionAndTx(definition, handler);
    }

    private <T> Flux<T> openNewConnectionAndTx(TransactionDefinition definition, ReactiveTransactionOperations.TransactionalCallback<C, T> handler) {
        return this.connectionOperations.withConnectionFlux(definition.getConnectionDefinition(), connectionStatus -> {
            DefaultReactiveTransactionStatus txStatus = new DefaultReactiveTransactionStatus(connectionStatus, true, definition);
            return this.executeTransactionFlux(txStatus, handler);
        });
    }

    @Override
    public <T> Mono<T> withTransactionMono(TransactionDefinition definition, Function<ReactiveTransactionStatus<C>, Mono<T>> handler) {
        Objects.requireNonNull(definition, "Transaction definition cannot be null");
        Objects.requireNonNull(handler, "Callback handler cannot be null");
        return Mono.deferContextual(contextView -> {
            ReactiveTransactionStatus<C> transactionStatus = this.getTransactionStatus((ContextView)contextView);
            TransactionDefinition.Propagation propagationBehavior = definition.getPropagationBehavior();
            if (transactionStatus != null) {
                if (propagationBehavior == TransactionDefinition.Propagation.NOT_SUPPORTED || propagationBehavior == TransactionDefinition.Propagation.NEVER) {
                    return Mono.error((Throwable)this.propagationNotSupported(propagationBehavior));
                }
                if (propagationBehavior == TransactionDefinition.Propagation.REQUIRES_NEW) {
                    return this.openNewConnectionAndTxMono(definition, handler);
                }
                return this.executeCallbackMono(this.existingTransaction(transactionStatus, definition), handler);
            }
            if (propagationBehavior == TransactionDefinition.Propagation.MANDATORY) {
                return Mono.error((Throwable)this.expectedTransaction());
            }
            return this.openNewConnectionAndTxMono(definition, handler);
        });
    }

    private <T> Mono<T> openNewConnectionAndTxMono(TransactionDefinition definition, Function<ReactiveTransactionStatus<C>, Mono<T>> handler) {
        return this.connectionOperations.withConnectionMono(definition.getConnectionDefinition(), connectionStatus -> this.executeTransactionMono(new DefaultReactiveTransactionStatus(connectionStatus, true, definition), handler));
    }

    @NonNull
    protected <R> Flux<R> executeTransactionFlux(final @NonNull DefaultReactiveTransactionStatus<C> txStatus, @NonNull ReactiveTransactionOperations.TransactionalCallback<C, R> handler) {
        ReactiveConnectionStatus connectionStatus = (ReactiveConnectionStatus)txStatus.getConnectionStatus();
        connectionStatus.registerReactiveSynchronization(new ReactiveConnectionSynchronization(){

            public Publisher<Void> onCancel() {
                return AbstractReactorTransactionOperations.this.doCancel(txStatus);
            }
        });
        return Flux.from(new SyncCompleteAndErrorPublisher(Mono.fromDirect(this.beginTransaction(txStatus.getConnectionStatus(), txStatus.getTransactionDefinition())).thenMany((Publisher)Mono.just(txStatus)).flatMap(status -> this.executeCallbackFlux((ReactiveTransactionStatus<C>)status, handler)), () -> this.doCommit(txStatus), throwable -> this.doRollback(txStatus, (Throwable)throwable), false));
    }

    @NonNull
    protected <R> Mono<R> executeTransactionMono(final @NonNull DefaultReactiveTransactionStatus<C> txStatus, @NonNull Function<ReactiveTransactionStatus<C>, Mono<R>> handler) {
        ReactiveConnectionStatus connectionStatus = (ReactiveConnectionStatus)txStatus.getConnectionStatus();
        connectionStatus.registerReactiveSynchronization(new ReactiveConnectionSynchronization(){

            public Publisher<Void> onCancel() {
                return AbstractReactorTransactionOperations.this.doCancel(txStatus);
            }
        });
        return Mono.from(new SyncCompleteAndErrorPublisher(Mono.fromDirect(this.beginTransaction(txStatus.getConnectionStatus(), txStatus.getTransactionDefinition())).thenMany((Publisher)Mono.just(txStatus)).flatMap(status -> this.executeCallbackMono((ReactiveTransactionStatus<C>)status, handler)), () -> this.doCommit(txStatus), throwable -> this.doRollback(txStatus, (Throwable)throwable), true));
    }

    @NonNull
    protected <R> Flux<R> executeCallbackFlux(@NonNull ReactiveTransactionStatus<C> status, @NonNull ReactiveTransactionOperations.TransactionalCallback<C, R> handler) {
        try {
            return Flux.just(status).flatMap(handler::doInTransaction).contextWrite(context -> this.addTxStatus((Context)context, status));
        }
        catch (Exception e) {
            return Flux.error((Throwable)new TransactionSystemException("Error invoking doInTransaction handler: " + e.getMessage(), e));
        }
    }

    @NonNull
    protected <R> Mono<R> executeCallbackMono(@NonNull ReactiveTransactionStatus<C> status, @NonNull Function<ReactiveTransactionStatus<C>, Mono<R>> handler) {
        try {
            return Mono.just(status).flatMap(handler::apply).contextWrite(context -> this.addTxStatus((Context)context, status));
        }
        catch (Exception e) {
            return Mono.error((Throwable)new TransactionSystemException("Error invoking doInTransaction handler: " + e.getMessage(), e));
        }
    }

    private ReactiveTransactionStatus<C> existingTransaction(final ReactiveTransactionStatus<C> existing, final TransactionDefinition transactionDefinition) {
        return new ReactiveTransactionStatus<C>(){

            @Override
            public C getConnection() {
                return existing.getConnection();
            }

            @Override
            public ConnectionStatus<C> getConnectionStatus() {
                return existing.getConnectionStatus();
            }

            @Override
            public boolean isNewTransaction() {
                return false;
            }

            @Override
            public void setRollbackOnly() {
                existing.setRollbackOnly();
            }

            @Override
            public boolean isRollbackOnly() {
                return existing.isRollbackOnly();
            }

            @Override
            public boolean isCompleted() {
                return existing.isCompleted();
            }

            @Override
            public TransactionDefinition getTransactionDefinition() {
                return transactionDefinition;
            }
        };
    }

    @NonNull
    protected Publisher<Void> doCancel(@NonNull DefaultReactiveTransactionStatus<C> status) {
        return this.doCommit(status);
    }

    @NonNull
    private Publisher<Void> doCommit(@NonNull DefaultReactiveTransactionStatus<C> status) {
        Flux op;
        try {
            op = status.isRollbackOnly() ? Flux.from(this.rollbackTransaction(status.getConnectionStatus(), status.getTransactionDefinition())) : Flux.from(this.commitTransaction(status.getConnectionStatus(), status.getTransactionDefinition()));
        }
        catch (Exception e) {
            op = Flux.error((Throwable)e);
        }
        return (Publisher)op.as(flux -> this.doFinish((Flux)flux, status));
    }

    @NonNull
    private Publisher<Void> doRollback(@NonNull DefaultReactiveTransactionStatus<C> status, @NonNull Throwable throwable) {
        Flux abort;
        if (LOG.isWarnEnabled()) {
            LOG.warn("Rolling back transaction on error: " + throwable.getMessage(), throwable);
        }
        try {
            TransactionDefinition definition = status.getTransactionDefinition();
            abort = definition.rollbackOn(throwable) ? Flux.from(this.rollbackTransaction(status.getConnectionStatus(), definition)) : Flux.error((Throwable)throwable);
        }
        catch (Exception e) {
            abort = Flux.error((Throwable)e);
        }
        return (Publisher)abort.onErrorResume(rollbackError -> {
            if (rollbackError != throwable && LOG.isWarnEnabled()) {
                LOG.warn("Error occurred during transaction rollback: " + rollbackError.getMessage(), rollbackError);
            }
            return Mono.error((Throwable)throwable);
        }).as(flux -> this.doFinish((Flux)flux, status));
    }

    private <T> Publisher<Void> doFinish(Flux<T> flux, DefaultReactiveTransactionStatus<C> status) {
        return flux.hasElements().map(ignore -> {
            status.completed = true;
            return ignore;
        }).then();
    }

    @NonNull
    private Context addTxStatus(@NonNull Context context, @NonNull ReactiveTransactionStatus<C> status) {
        return ReactorPropagation.addContextElement((Context)context, new ReactiveTransactionPropagatedContext<C>(this, status));
    }

    @NonNull
    private NoTransactionException expectedTransaction() {
        return new NoTransactionException("Expected an existing transaction, but none was found in the Reactive context.");
    }

    @NonNull
    private TransactionUsageException propagationNotSupported(TransactionDefinition.Propagation propagationBehavior) {
        return new TransactionUsageException("Found an existing transaction but propagation behaviour doesn't support it: " + String.valueOf((Object)propagationBehavior));
    }

    private record ReactiveTransactionPropagatedContext<C>(ReactiveTransactionOperations<?> transactionOperations, ReactiveTransactionStatus<C> status) implements PropagatedContextElement
    {
    }

    protected static final class DefaultReactiveTransactionStatus<C>
    implements ReactiveTransactionStatus<C> {
        private final ConnectionStatus<C> connectionStatus;
        private final boolean isNew;
        private final TransactionDefinition transactionDefinition;
        private boolean rollbackOnly;
        private boolean completed;

        public DefaultReactiveTransactionStatus(ConnectionStatus<C> connectionStatus, boolean isNew, TransactionDefinition transactionDefinition) {
            this.connectionStatus = connectionStatus;
            this.isNew = isNew;
            this.transactionDefinition = transactionDefinition;
        }

        @Override
        public ConnectionStatus<C> getConnectionStatus() {
            return this.connectionStatus;
        }

        @Override
        public TransactionDefinition getTransactionDefinition() {
            return this.transactionDefinition;
        }

        @Override
        public C getConnection() {
            return (C)this.connectionStatus.getConnection();
        }

        @Override
        public boolean isNewTransaction() {
            return this.isNew;
        }

        @Override
        public void setRollbackOnly() {
            this.rollbackOnly = true;
        }

        @Override
        public boolean isRollbackOnly() {
            return this.rollbackOnly;
        }

        @Override
        public boolean isCompleted() {
            return this.completed;
        }
    }
}

