package com.aerospike.kafka.connect.sink;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.async.AsyncClient;
import com.aerospike.client.async.AsyncClientPolicy;
import com.aerospike.client.listener.WriteListener;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.kafka.connect.data.AerospikeRecord;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aerospike/kafka/connect/sink/AsyncWriter.class */
public class AsyncWriter {
    private static final Logger log = LoggerFactory.getLogger(AsyncWriter.class);
    private final AsyncClient client;
    private final WritePolicy writePolicy;
    private final Counter inFlight;
    private final ResultListener listener;

    /* loaded from: input_file:com/aerospike/kafka/connect/sink/AsyncWriter$Counter.class */
    class Counter {
        private static final long DEFAULT_SLEEP_INTERVAL_MS = 1;
        private AtomicInteger counter;
        private final long sleepMs;

        public Counter(AsyncWriter asyncWriter) {
            this(DEFAULT_SLEEP_INTERVAL_MS);
        }

        public Counter(long j) {
            this.sleepMs = j;
            this.counter = new AtomicInteger(0);
        }

        public void increment() {
            this.counter.incrementAndGet();
        }

        public void decrement() {
            this.counter.decrementAndGet();
        }

        public void waitUntilZero() {
            while (true) {
                try {
                    int i = this.counter.get();
                    if (i <= 0) {
                        return;
                    }
                    AsyncWriter.log.trace("Waiting " + this.sleepMs + "ms for counter to reach zero - current: " + i);
                    Thread.sleep(this.sleepMs);
                } catch (InterruptedException e) {
                    throw new ConnectException("Interrupted while waiting to complete in-flight requests", e);
                }
            }
        }
    }

    /* loaded from: input_file:com/aerospike/kafka/connect/sink/AsyncWriter$ResultListener.class */
    class ResultListener implements WriteListener {
        private final Counter counter;
        private final AtomicBoolean retry = new AtomicBoolean(true);
        private final AtomicInteger exceptions = new AtomicInteger(0);
        private final AtomicReference<Throwable> exception = new AtomicReference<>();

        public ResultListener(Counter counter) {
            this.counter = counter;
        }

        public void raiseErrors() throws ConnectException {
            Throwable th = this.exception.get();
            if (th == null) {
                return;
            }
            String str = "Error writing records: " + this.exceptions.get() + " exception(s) occurred while asynchronously writing records";
            if (!this.retry.get()) {
                throw new ConnectException(str, th);
            }
            throw new RetriableException(str, th);
        }

        public void onFailure(AerospikeException aerospikeException) {
            AsyncWriter.log.error("Error writing record", aerospikeException);
            this.exception.compareAndSet(null, aerospikeException);
            this.retry.compareAndSet(true, retriable(aerospikeException));
            this.exceptions.incrementAndGet();
            this.counter.decrement();
        }

        public void onSuccess(Key key) {
            AsyncWriter.log.trace("Successfully put key {}", key);
            this.counter.decrement();
        }

        private boolean retriable(AerospikeException aerospikeException) {
            return (aerospikeException instanceof AerospikeException.CommandRejected) || (aerospikeException instanceof AerospikeException.Timeout) || (aerospikeException instanceof AerospikeException.Connection);
        }
    }

    public AsyncWriter(ConnectorConfig connectorConfig) {
        try {
            this.client = new AsyncClient(createClientPolicy(connectorConfig), connectorConfig.getHosts());
            this.inFlight = new Counter(this);
            this.listener = new ResultListener(this.inFlight);
            this.writePolicy = createWritePolicy(connectorConfig);
        } catch (AerospikeException e) {
            throw new ConnectException("Error connecting to Aerospike cluster", e);
        }
    }

    public void write(AerospikeRecord aerospikeRecord) {
        this.listener.raiseErrors();
        Key key = aerospikeRecord.key();
        Bin[] bins = aerospikeRecord.bins();
        this.inFlight.increment();
        this.client.put(this.writePolicy, this.listener, key, bins);
    }

    public void flush() {
        this.listener.raiseErrors();
        this.inFlight.waitUntilZero();
    }

    public void close() {
        this.client.close();
    }

    private AsyncClientPolicy createClientPolicy(ConnectorConfig connectorConfig) {
        AsyncClientPolicy asyncClientPolicy = new AsyncClientPolicy();
        asyncClientPolicy.asyncMaxCommands = connectorConfig.getMaxAsyncCommands();
        asyncClientPolicy.asyncMaxCommandAction = connectorConfig.getMaxCommandAction();
        return asyncClientPolicy;
    }

    private WritePolicy createWritePolicy(ConnectorConfig connectorConfig) {
        WritePolicy writePolicy = new WritePolicy();
        RecordExistsAction policyRecordExistsAction = connectorConfig.getPolicyRecordExistsAction();
        if (policyRecordExistsAction != null) {
            writePolicy.recordExistsAction = policyRecordExistsAction;
        }
        log.trace("Write Policy: recordExistsAction={}", writePolicy.recordExistsAction);
        return writePolicy;
    }
}
