package com.addc.commons.queue;

import com.addc.commons.Mutex;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/addc/commons/queue/PersistingQueueReader.class */
public class PersistingQueueReader<T> implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(PersistingQueueReader.class);
    private final PersistingQueue<T> queue;
    private final PayloadDispatcher<T> dispatcher;
    private final String threadName;
    private Thread runnerThread;
    private final PersistingQueueReaderState<T> dispatcherState = new PersistingQueueReaderState<>();
    private final Mutex delayMutex = new Mutex();
    private final List<PersistingQueueReaderListener<T>> listeners = new LinkedList();
    private final ReaderDelayGenerator<T> delayGenerator = new ReaderDelayGenerator<>(this);

    public PersistingQueueReader(PersistingQueue<T> persistingQueue, String str, PayloadDispatcher<T> payloadDispatcher) {
        this.queue = persistingQueue;
        this.threadName = str;
        this.dispatcher = payloadDispatcher;
    }

    public void addListener(PersistingQueueReaderListener<T> persistingQueueReaderListener) {
        synchronized (this.listeners) {
            this.listeners.add(persistingQueueReaderListener);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        LOGGER.info("Thread starts...");
        do {
            if (this.dispatcherState.connectionLost && !isShutdown()) {
                delay();
            }
            if (this.dispatcherState.currentPayload == null) {
                this.dispatcherState.currentPayload = this.queue.take();
            }
            if (this.dispatcherState.currentPayload != null) {
                processCurrentPayload();
            }
        } while (!isShutdown());
        LOGGER.info("Thread ends...");
    }

    public void shutdown() {
        LOGGER.info("Stop the dispatcher thread...");
        setShutdown(true);
        breakDelay();
        this.queue.interruptTake();
        try {
            this.runnerThread.join();
        } catch (InterruptedException e) {
            LOGGER.debug("Interrupted", e);
        }
        LOGGER.info("Recover any outstanding batches and send them...");
        if (this.dispatcherState.currentPayload == null) {
            this.dispatcherState.currentPayload = this.queue.poll();
        }
        if (this.dispatcherState.currentPayload != null) {
            sendPendingPayloads();
        }
        LOGGER.info("Dispatcher terminated, returning {}", this.dispatcherState.currentPayload);
        this.queue.shutdown(this.dispatcherState.currentPayload);
    }

    public void start() {
        this.runnerThread = new Thread(this, this.threadName);
        this.runnerThread.start();
    }

    public boolean isShutdown() {
        boolean z;
        synchronized (this.dispatcherState.shutdownLock) {
            z = this.dispatcherState.shutdown;
        }
        return z;
    }

    private void delay() {
        synchronized (this.delayMutex) {
            try {
                this.delayMutex.wait(this.delayGenerator.getDelay());
            } catch (InterruptedException e) {
                LOGGER.debug("Interrupted", e);
            }
        }
    }

    private void processCurrentPayload() {
        try {
            this.dispatcher.dispatch(this.dispatcherState.currentPayload);
            notifyForward(null);
            this.dispatcherState.currentPayload = null;
        } catch (DispatcherException e) {
            if (!e.isRecoverable()) {
                this.dispatcherState.connectionLost = true;
                notifyDispatcherError(e);
                setShutdown(true);
            } else {
                this.dispatcherState.connectionLost = true;
                notifyForward(e);
                if (e.getRetryDelay() != null) {
                    delay(e.getRetryDelay().longValue());
                }
            }
        } catch (Exception e2) {
            LOGGER.warn("Unexpected exception, aborting queue reader", e2);
            setShutdown(true);
        }
    }

    private void delay(long j) {
        synchronized (this.delayMutex) {
            try {
                this.delayMutex.wait(j);
            } catch (InterruptedException e) {
                LOGGER.trace("Interrupted", e);
            }
        }
    }

    private void sendPendingPayloads() {
        boolean z = true;
        do {
            try {
                this.dispatcher.dispatch(this.dispatcherState.currentPayload);
                this.dispatcherState.currentPayload = null;
            } catch (Exception e) {
                this.dispatcherState.connectionLost = true;
                notifyForward(e);
                z = false;
            }
            if (this.dispatcherState.currentPayload == null) {
                this.dispatcherState.currentPayload = this.queue.poll();
            }
            if (this.dispatcherState.currentPayload == null) {
                return;
            }
        } while (z);
    }

    private void notifyForward(Exception exc) {
        synchronized (this.listeners) {
            Iterator<PersistingQueueReaderListener<T>> it = this.listeners.iterator();
            while (it.hasNext()) {
                it.next().onProcess(this.dispatcherState.connectionLost, exc);
            }
        }
    }

    private void notifyDispatcherError(DispatcherException dispatcherException) {
        synchronized (this.listeners) {
            Iterator<PersistingQueueReaderListener<T>> it = this.listeners.iterator();
            while (it.hasNext()) {
                it.next().onDispatcherError(dispatcherException);
            }
        }
    }

    private void setShutdown(boolean z) {
        synchronized (this.dispatcherState.shutdownLock) {
            this.dispatcherState.shutdown = z;
        }
    }

    private void breakDelay() {
        synchronized (this.delayMutex) {
            this.delayMutex.notifyAll();
        }
    }
}
