package com.github.segmentio.safeclient;

import com.github.segmentio.safeclient.flusher.IFlusher;
import com.github.segmentio.safeclient.flusher.ThreadPoolFlusher;
import com.github.segmentio.safeclient.policy.flush.FlushAfterTimePolicy;
import com.github.segmentio.safeclient.policy.flush.FlushAtSizePolicy;
import com.github.segmentio.safeclient.policy.flush.IFlushPolicy;
import com.github.segmentio.safeclient.policy.queue.DenyAfterCapacityPolicy;
import com.github.segmentio.safeclient.policy.queue.IQueueDenyPolicy;
import com.github.segmentio.safeclient.queue.IBatchQueue;
import com.github.segmentio.safeclient.queue.NonLockingQueue;
import com.github.segmentio.safeclient.utils.RateLimit;
import com.github.segmentio.safeclient.utils.Statistics;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/segmentio/safeclient/BatchedOperation.class */
public abstract class BatchedOperation<M> {
    private static final Logger logger = LoggerFactory.getLogger(BatchedOperation.class);
    private DateTime lastFlush;
    protected Iterable<IFlushPolicy> flushPolicies = createFlushPolicies();
    protected Iterable<IQueueDenyPolicy> denyPolicies = createCapacityPolicies();
    private IFlusher flusher = createFlusher();
    private IBatchQueue<M> queue = createQueue();
    protected RateLimit errorLoggingRateLimit = new RateLimit(1, 1000);
    protected RateLimit statisticsLoggingRateLimit = new RateLimit(1, 5000);
    public Statistics statistics = new Statistics();

    public abstract boolean canFlush();

    public abstract void performFlush(List<M> list);

    public boolean perform(M m) {
        boolean z = true;
        int size = this.queue.size();
        Iterator<IQueueDenyPolicy> it = this.denyPolicies.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!it.next().canQueue(size)) {
                z = false;
                this.statistics.update("Queue over Capacity => Denied Message", 1.0d);
                break;
            }
        }
        if (z) {
            size = this.queue.add(m);
            this.statistics.update("Enqueued Message", 1.0d);
        } else if (this.errorLoggingRateLimit.canPerform()) {
            logger.warn("Operation batch queue is full, and flushing operations are also pending. Choosing to drop this message from the queue.");
        }
        if (canFlush()) {
            Iterator<IFlushPolicy> it2 = this.flushPolicies.iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                if (it2.next().shouldFlush(size, this.lastFlush)) {
                    this.statistics.update("Asking to Flush", 1.0d);
                    flush();
                    break;
                }
            }
        } else {
            if (this.errorLoggingRateLimit.canPerform()) {
                logger.warn("Batched operation can't flush.");
            }
            this.statistics.update("Batched Operation Can't Flush", 1.0d);
        }
        this.statistics.update("Queue Size", this.queue.size());
        if (shouldLogStatistics() && this.statisticsLoggingRateLimit.canPerform()) {
            logger.debug(this.statistics.toString());
        }
        return z;
    }

    public boolean flush() {
        if (!this.flusher.canFlush()) {
            this.statistics.update("Flusher Can't Flush", 1.0d);
            return false;
        }
        List<M> flush = this.queue.flush(getMaxFlushAmount());
        if (flush == null) {
            return false;
        }
        this.flusher.flush(this, flush);
        this.statistics.update("Flushes", 1.0d);
        this.statistics.update("Flush Batch Size", flush.size());
        this.lastFlush = new DateTime(DateTimeZone.UTC);
        return true;
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    public boolean shouldLogStatistics() {
        return true;
    }

    protected int getMaxFlushAmount() {
        return 50;
    }

    protected int getMaxQueueSize() {
        return getMaxFlushAmount() * 20;
    }

    protected Iterable<IFlushPolicy> createFlushPolicies() {
        return Arrays.asList(new FlushAfterTimePolicy(10000L), new FlushAtSizePolicy(getMaxFlushAmount()));
    }

    protected Iterable<IQueueDenyPolicy> createCapacityPolicies() {
        LinkedList linkedList = new LinkedList();
        linkedList.add(new DenyAfterCapacityPolicy(getMaxQueueSize()));
        return linkedList;
    }

    protected IFlusher createFlusher() {
        return new ThreadPoolFlusher(0, 1, 1000);
    }

    protected IBatchQueue<M> createQueue() {
        return new NonLockingQueue();
    }

    public void clear() {
        if (this.queue != null) {
            this.queue.clear();
        }
    }

    public void close() {
        if (this.flusher != null) {
            this.flusher.close();
        }
        if (this.queue != null) {
            this.queue.clear();
        }
    }
}
