/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.kafka09.org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.producer.Callback;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.producer.internals.BufferPool;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.producer.internals.RecordBatch;
import org.graylog.shaded.kafka09.org.apache.kafka.common.Cluster;
import org.graylog.shaded.kafka09.org.apache.kafka.common.MetricName;
import org.graylog.shaded.kafka09.org.apache.kafka.common.Node;
import org.graylog.shaded.kafka09.org.apache.kafka.common.PartitionInfo;
import org.graylog.shaded.kafka09.org.apache.kafka.common.TopicPartition;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.Measurable;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.MetricConfig;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.Metrics;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.Sensor;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.stats.Rate;
import org.graylog.shaded.kafka09.org.apache.kafka.common.record.CompressionType;
import org.graylog.shaded.kafka09.org.apache.kafka.common.record.MemoryRecords;
import org.graylog.shaded.kafka09.org.apache.kafka.common.record.Record;
import org.graylog.shaded.kafka09.org.apache.kafka.common.utils.CopyOnWriteMap;
import org.graylog.shaded.kafka09.org.apache.kafka.common.utils.Time;
import org.graylog.shaded.kafka09.org.apache.kafka.common.utils.Utils;
import org.graylog.shaded.kafka09.org.slf4j.Logger;
import org.graylog.shaded.kafka09.org.slf4j.LoggerFactory;

public final class RecordAccumulator {
    private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
    private volatile boolean closed = false;
    private int drainIndex = 0;
    private final AtomicInteger flushesInProgress = new AtomicInteger(0);
    private final AtomicInteger appendsInProgress = new AtomicInteger(0);
    private final int batchSize;
    private final CompressionType compression;
    private final long lingerMs;
    private final long retryBackoffMs;
    private final BufferPool free;
    private final Time time;
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
    private final IncompleteRecordBatches incomplete;

    public RecordAccumulator(int batchSize, long totalSize, CompressionType compression, long lingerMs, long retryBackoffMs, Metrics metrics, Time time, Map<String, String> metricTags) {
        this.batchSize = batchSize;
        this.compression = compression;
        this.lingerMs = lingerMs;
        this.retryBackoffMs = retryBackoffMs;
        this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
        String metricGrpName = "producer-metrics";
        this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName, metricTags);
        this.incomplete = new IncompleteRecordBatches();
        this.time = time;
        this.registerMetrics(metrics, metricGrpName, metricTags);
    }

    private void registerMetrics(Metrics metrics, String metricGrpName, Map<String, String> metricTags) {
        MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags);
        Measurable waitingThreads = new Measurable(){

            @Override
            public double measure(MetricConfig config, long now) {
                return RecordAccumulator.this.free.queued();
            }
        };
        metrics.addMetric(metricName, waitingThreads);
        metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags);
        Measurable totalBytes = new Measurable(){

            @Override
            public double measure(MetricConfig config, long now) {
                return RecordAccumulator.this.free.totalMemory();
            }
        };
        metrics.addMetric(metricName, totalBytes);
        metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags);
        Measurable availableBytes = new Measurable(){

            @Override
            public double measure(MetricConfig config, long now) {
                return RecordAccumulator.this.free.availableMemory();
            }
        };
        metrics.addMetric(metricName, availableBytes);
        Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records");
        metricName = new MetricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion", metricTags);
        bufferExhaustedRecordSensor.add(metricName, new Rate());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value2, Callback callback, long maxTimeToBlock) throws InterruptedException {
        ByteBuffer buffer;
        Deque<RecordBatch> dq;
        block13: {
            FutureRecordMetadata future;
            block12: {
                RecordAppendResult recordAppendResult;
                this.appendsInProgress.incrementAndGet();
                try {
                    FutureRecordMetadata future2;
                    if (this.closed) {
                        throw new IllegalStateException("Cannot send after the producer is closed.");
                    }
                    Deque<RecordBatch> deque = dq = this.dequeFor(tp);
                    // MONITORENTER : deque
                    RecordBatch last2 = dq.peekLast();
                    if (last2 == null || (future2 = last2.tryAppend(key, value2, callback, this.time.milliseconds())) == null) break block12;
                    recordAppendResult = new RecordAppendResult(future2, dq.size() > 1 || last2.records.isFull(), false);
                    // MONITOREXIT : deque
                    this.appendsInProgress.decrementAndGet();
                }
                catch (Throwable throwable) {
                    this.appendsInProgress.decrementAndGet();
                    throw throwable;
                }
                return recordAppendResult;
            }
            // MONITOREXIT : deque
            int size2 = Math.max(this.batchSize, 12 + Record.recordSize(key, value2));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", new Object[]{size2, tp.topic(), tp.partition()});
            buffer = this.free.allocate(size2, maxTimeToBlock);
            Deque<RecordBatch> deque = dq;
            // MONITORENTER : deque
            if (this.closed) {
                throw new IllegalStateException("Cannot send after the producer is closed.");
            }
            RecordBatch last3 = dq.peekLast();
            if (last3 == null || (future = last3.tryAppend(key, value2, callback, this.time.milliseconds())) == null) break block13;
            this.free.deallocate(buffer);
            RecordAppendResult recordAppendResult = new RecordAppendResult(future, dq.size() > 1 || last3.records.isFull(), false);
            // MONITOREXIT : deque
            this.appendsInProgress.decrementAndGet();
            return recordAppendResult;
        }
        MemoryRecords records = MemoryRecords.emptyRecords(buffer, this.compression, this.batchSize);
        RecordBatch batch = new RecordBatch(tp, records, this.time.milliseconds());
        FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value2, callback, this.time.milliseconds()));
        dq.addLast(batch);
        this.incomplete.add(batch);
        RecordAppendResult recordAppendResult = new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
        // MONITOREXIT : deque
        this.appendsInProgress.decrementAndGet();
        return recordAppendResult;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<RecordBatch> abortExpiredBatches(int requestTimeout, Cluster cluster, long now) {
        ArrayList<RecordBatch> expiredBatches = new ArrayList<RecordBatch>();
        int count2 = 0;
        for (Map.Entry entry2 : this.batches.entrySet()) {
            Deque dq;
            TopicPartition topicAndPartition = (TopicPartition)entry2.getKey();
            Deque deque = dq = (Deque)entry2.getValue();
            synchronized (deque) {
                Iterator batchIterator = dq.iterator();
                while (batchIterator.hasNext()) {
                    RecordBatch batch = (RecordBatch)batchIterator.next();
                    if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
                        expiredBatches.add(batch);
                        ++count2;
                        batchIterator.remove();
                        this.deallocate(batch);
                        continue;
                    }
                    if (batch.inRetry()) continue;
                    break;
                }
            }
        }
        if (expiredBatches.size() > 0) {
            log.trace("Expired {} batches in accumulator", count2);
        }
        return expiredBatches;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reenqueue(RecordBatch batch, long now) {
        Deque<RecordBatch> deque;
        ++batch.attempts;
        batch.lastAttemptMs = now;
        batch.lastAppendTime = now;
        batch.setRetry();
        Deque<RecordBatch> deque2 = deque = this.dequeFor(batch.topicPartition);
        synchronized (deque2) {
            deque.addFirst(batch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        HashSet<Node> readyNodes = new HashSet<Node>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        boolean unknownLeadersExist = false;
        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry entry2 : this.batches.entrySet()) {
            TopicPartition part = (TopicPartition)entry2.getKey();
            Deque deque = (Deque)entry2.getValue();
            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                unknownLeadersExist = true;
                continue;
            }
            if (readyNodes.contains(leader)) continue;
            Deque deque2 = deque;
            synchronized (deque2) {
                RecordBatch batch = (RecordBatch)deque.peekFirst();
                if (batch != null) {
                    boolean sendable;
                    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + this.retryBackoffMs > nowMs;
                    long waitedTimeMs = nowMs - batch.lastAttemptMs;
                    long timeToWaitMs = backingOff ? this.retryBackoffMs : this.lingerMs;
                    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0L);
                    boolean full = deque.size() > 1 || batch.records.isFull();
                    boolean expired = waitedTimeMs >= timeToWaitMs;
                    boolean bl = sendable = full || expired || exhausted || this.closed || this.flushInProgress();
                    if (sendable && !backingOff) {
                        readyNodes.add(leader);
                    } else {
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean hasUnsent() {
        for (Map.Entry entry2 : this.batches.entrySet()) {
            Deque deque;
            Deque deque2 = deque = (Deque)entry2.getValue();
            synchronized (deque2) {
                if (deque.size() > 0) {
                    return true;
                }
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
        if (nodes.isEmpty()) {
            return Collections.emptyMap();
        }
        HashMap<Integer, List<RecordBatch>> batches = new HashMap<Integer, List<RecordBatch>>();
        for (Node node : nodes) {
            int size2 = 0;
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
            ArrayList<RecordBatch> ready2 = new ArrayList<RecordBatch>();
            int start = this.drainIndex %= parts.size();
            do {
                PartitionInfo part;
                Deque<RecordBatch> deque;
                if ((deque = this.dequeFor(new TopicPartition((part = parts.get(this.drainIndex)).topic(), part.partition()))) != null) {
                    Deque<RecordBatch> deque2 = deque;
                    synchronized (deque2) {
                        RecordBatch first = deque.peekFirst();
                        if (first != null) {
                            boolean backoff2;
                            boolean bl = backoff2 = first.attempts > 0 && first.lastAttemptMs + this.retryBackoffMs > now;
                            if (!backoff2) {
                                if (size2 + first.records.sizeInBytes() > maxSize && !ready2.isEmpty()) {
                                    break;
                                }
                                RecordBatch batch = deque.pollFirst();
                                batch.records.close();
                                size2 += batch.records.sizeInBytes();
                                ready2.add(batch);
                                batch.drainedMs = now;
                            }
                        }
                    }
                }
                this.drainIndex = (this.drainIndex + 1) % parts.size();
            } while (start != this.drainIndex);
            batches.put(node.id(), ready2);
        }
        return batches;
    }

    private Deque<RecordBatch> dequeFor(TopicPartition tp) {
        ArrayDeque<RecordBatch> d = (ArrayDeque<RecordBatch>)this.batches.get(tp);
        if (d != null) {
            return d;
        }
        d = new ArrayDeque<RecordBatch>();
        Deque previous = this.batches.putIfAbsent(tp, d);
        if (previous == null) {
            return d;
        }
        return previous;
    }

    public void deallocate(RecordBatch batch) {
        this.incomplete.remove(batch);
        this.free.deallocate(batch.records.buffer(), batch.records.initialCapacity());
    }

    private boolean flushInProgress() {
        return this.flushesInProgress.get() > 0;
    }

    public void beginFlush() {
        this.flushesInProgress.getAndIncrement();
    }

    private boolean appendsInProgress() {
        return this.appendsInProgress.get() > 0;
    }

    public void awaitFlushCompletion() throws InterruptedException {
        for (RecordBatch batch : this.incomplete.all()) {
            batch.produceFuture.await();
        }
        this.flushesInProgress.decrementAndGet();
    }

    public void abortIncompleteBatches() {
        do {
            this.abortBatches();
        } while (this.appendsInProgress());
        this.abortBatches();
        this.batches.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void abortBatches() {
        for (RecordBatch batch : this.incomplete.all()) {
            Deque<RecordBatch> dq;
            Deque<RecordBatch> deque = dq = this.dequeFor(batch.topicPartition);
            synchronized (deque) {
                batch.records.close();
            }
            batch.done(-1L, new IllegalStateException("Producer is closed forcefully."));
            this.deallocate(batch);
        }
    }

    public void close() {
        this.closed = true;
    }

    private static final class IncompleteRecordBatches {
        private final Set<RecordBatch> incomplete = new HashSet<RecordBatch>();

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void add(RecordBatch batch) {
            Set<RecordBatch> set = this.incomplete;
            synchronized (set) {
                this.incomplete.add(batch);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void remove(RecordBatch batch) {
            Set<RecordBatch> set = this.incomplete;
            synchronized (set) {
                boolean removed = this.incomplete.remove(batch);
                if (!removed) {
                    throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Iterable<RecordBatch> all() {
            Set<RecordBatch> set = this.incomplete;
            synchronized (set) {
                return new ArrayList<RecordBatch>(this.incomplete);
            }
        }
    }

    public static final class ReadyCheckResult {
        public final Set<Node> readyNodes;
        public final long nextReadyCheckDelayMs;
        public final boolean unknownLeadersExist;

        public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, boolean unknownLeadersExist) {
            this.readyNodes = readyNodes;
            this.nextReadyCheckDelayMs = nextReadyCheckDelayMs;
            this.unknownLeadersExist = unknownLeadersExist;
        }
    }

    public static final class RecordAppendResult {
        public final FutureRecordMetadata future;
        public final boolean batchIsFull;
        public final boolean newBatchCreated;

        public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) {
            this.future = future;
            this.batchIsFull = batchIsFull;
            this.newBatchCreated = newBatchCreated;
        }
    }
}

