/*
 * Decompiled with CFR 0.152.
 */
package org.slingerxv.limitart.taskqueue;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slingerxv.limitart.funcs.Proc1;
import org.slingerxv.limitart.funcs.Proc3;
import org.slingerxv.limitart.funcs.Procs;
import org.slingerxv.limitart.funcs.Test1;
import org.slingerxv.limitart.funcs.Tests;
import org.slingerxv.limitart.taskqueue.ITaskQueue;
import org.slingerxv.limitart.taskqueue.NamedThreadFactory;
import org.slingerxv.limitart.taskqueue.TaskQueueException;

public class DisruptorTaskQueue<T>
implements ITaskQueue<T> {
    private static Logger log = LoggerFactory.getLogger(DisruptorTaskQueue.class);
    private Disruptor<DisruptorTaskQueueEvent> disruptor;
    private NamedThreadFactory threadFactory;
    private Test1<T> intercept;
    private Proc1<T> handle;
    private Proc3<DisruptorTaskQueueEvent, Throwable, Long> exception;

    public DisruptorTaskQueue(String threadName) {
        this(threadName, 8192);
    }

    public DisruptorTaskQueue(final String threadName, int bufferSize) {
        this.threadFactory = new NamedThreadFactory(){

            @Override
            public String getThreadName() {
                return threadName;
            }
        };
        this.disruptor = new Disruptor(() -> {
            DisruptorTaskQueue disruptorTaskQueue = this;
            disruptorTaskQueue.getClass();
            return disruptorTaskQueue.new DisruptorTaskQueueEvent();
        }, bufferSize, (ThreadFactory)this.threadFactory, ProducerType.MULTI, (WaitStrategy)new BlockingWaitStrategy());
        this.disruptor.handleEventsWith(new EventHandler[]{(event, sequence, endOfBatch) -> {
            if (Tests.invoke(this.intercept, event.getMsg())) {
                return;
            }
            try {
                Procs.invoke(this.handle, event.getMsg());
            }
            catch (Exception e) {
                log.error("invoke handler error", (Throwable)e);
            }
            finally {
                event.setMsg(null);
            }
        }});
        this.disruptor.setDefaultExceptionHandler((ExceptionHandler)new ExceptionHandler<DisruptorTaskQueueEvent>(){

            public void handleEventException(Throwable ex, long sequence, DisruptorTaskQueueEvent event) {
                log.error("sequence " + sequence + " error!", ex);
                Procs.invoke(DisruptorTaskQueue.this.exception, event, ex, sequence);
            }

            public void handleOnStartException(Throwable ex) {
                log.error("Exception during onStart()", ex);
            }

            public void handleOnShutdownException(Throwable ex) {
                log.error("Exception during onShutdown()", ex);
            }
        });
    }

    public ITaskQueue<T> intercept(Test1<T> intercept) {
        this.intercept = intercept;
        return this;
    }

    public ITaskQueue<T> handle(Proc1<T> handle) {
        this.handle = handle;
        return this;
    }

    public ITaskQueue<T> exception(Proc3<DisruptorTaskQueueEvent, Throwable, Long> exception) {
        this.exception = exception;
        return this;
    }

    @Override
    public void startServer() {
        this.disruptor.start();
        log.info("thread " + this.threadFactory.getThreadName() + " start!");
    }

    @Override
    public void stopServer() {
        if (this.disruptor != null) {
            this.disruptor.shutdown();
            log.info("thread " + this.threadFactory.getThreadName() + " stop!");
            this.disruptor = null;
            this.threadFactory = null;
        }
    }

    @Override
    public void addCommand(T command) throws TaskQueueException {
        if (this.disruptor == null) {
            throw new TaskQueueException(this.getThreadName() + " has not start yet!");
        }
        this.disruptor.getRingBuffer().publishEvent((event, sequence) -> event.setMsg(command));
    }

    @Override
    public String getThreadName() {
        return this.threadFactory.getThreadName();
    }

    private class DisruptorTaskQueueEvent {
        private T msg;

        private DisruptorTaskQueueEvent() {
        }

        public T getMsg() {
            return this.msg;
        }

        public void setMsg(T msg) {
            this.msg = msg;
        }
    }
}

