/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.kafka.common.producer;

import jakarta.annotation.Nonnull;
import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.TimeoutException;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.kafka.common.producer.GeneratedPublisher;
import ru.tinkoff.kora.kafka.common.producer.KafkaPublisherConfig;
import ru.tinkoff.kora.kafka.common.producer.TransactionImpl;
import ru.tinkoff.kora.kafka.common.producer.TransactionalPublisher;

public final class TransactionalPublisherImpl<P extends GeneratedPublisher>
implements TransactionalPublisher<P>,
Lifecycle {
    private final BlockingDeque<P> pool = new LinkedBlockingDeque<P>();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final AtomicInteger size = new AtomicInteger(0);
    private final Supplier<? extends P> factory;
    private final KafkaPublisherConfig.TransactionConfig transactionConfig;

    public TransactionalPublisherImpl(KafkaPublisherConfig.TransactionConfig config, Supplier<? extends P> factory) {
        this.transactionConfig = Objects.requireNonNull(config);
        this.factory = factory;
    }

    @Override
    @Nonnull
    public final TransactionalPublisher.Transaction<P> begin() {
        if (this.isClosed.get()) {
            throw new IllegalStateException("Pool has already closed!");
        }
        GeneratedPublisher pooledWrapper = (GeneratedPublisher)this.pool.pollFirst();
        if (pooledWrapper != null) {
            Producer<byte[], byte[]> producer = pooledWrapper.producer();
            try {
                producer.beginTransaction();
            }
            catch (Throwable e) {
                this.size.decrementAndGet();
                producer.close();
                throw e;
            }
            return new TransactionImpl<GeneratedPublisher>(pooledWrapper, this);
        }
        if (this.size.incrementAndGet() > this.transactionConfig.maxPoolSize()) {
            this.size.decrementAndGet();
            try {
                GeneratedPublisher waitedWrapper = (GeneratedPublisher)this.pool.pollFirst(this.transactionConfig.maxWaitTime().toMillis(), TimeUnit.MILLISECONDS);
                if (waitedWrapper != null) {
                    Producer<byte[], byte[]> producer = waitedWrapper.producer();
                    try {
                        producer.beginTransaction();
                    }
                    catch (Throwable e) {
                        this.size.decrementAndGet();
                        producer.close();
                        throw e;
                    }
                    return new TransactionImpl<GeneratedPublisher>(waitedWrapper, this);
                }
                throw new TimeoutException("Pooled producer was not available after " + String.valueOf(this.transactionConfig.maxWaitTime()));
            }
            catch (InterruptedException e) {
                throw new KafkaException((Throwable)e);
            }
        }
        P p = this.createNewProducer();
        try {
            p.producer().beginTransaction();
        }
        catch (Throwable e) {
            this.size.decrementAndGet();
            throw e;
        }
        return new TransactionImpl<P>(p, this);
    }

    private P createNewProducer() {
        GeneratedPublisher p = (GeneratedPublisher)this.factory.get();
        try {
            p.init();
        }
        catch (Throwable e) {
            try {
                p.release();
            }
            catch (Exception ex) {
                e.addSuppressed(ex);
            }
            if (e instanceof RuntimeException) {
                RuntimeException re = (RuntimeException)e;
                throw re;
            }
            if (e instanceof Error) {
                Error re = (Error)e;
                throw re;
            }
            throw new RuntimeException(e);
        }
        p.producer().initTransactions();
        return (P)p;
    }

    public final void returnToPool(P p) {
        if (this.isClosed.get()) {
            p.producer().close();
        } else {
            this.pool.addFirst(p);
        }
    }

    public final void deleteFromPool(P p) {
        this.size.decrementAndGet();
        p.producer().close();
    }

    public void init() {
    }

    public void release() throws Exception {
        if (this.isClosed.compareAndSet(false, true)) {
            for (GeneratedPublisher p : this.pool) {
                p.release();
            }
        }
    }
}

