/*
 * Decompiled with CFR 0.152.
 */
package cn.wjybxx.concurrent;

import cn.wjybxx.base.MathCommon;
import cn.wjybxx.base.collection.DefaultIndexedPriorityQueue;
import cn.wjybxx.base.collection.IndexedElement;
import cn.wjybxx.base.collection.IndexedPriorityQueue;
import cn.wjybxx.concurrent.AbstractEventLoop;
import cn.wjybxx.concurrent.AbstractScheduledEventLoop;
import cn.wjybxx.concurrent.DefaultThreadFactory;
import cn.wjybxx.concurrent.EmptyAgent;
import cn.wjybxx.concurrent.EventLoopAgent;
import cn.wjybxx.concurrent.EventLoopBuilder;
import cn.wjybxx.concurrent.EventLoopModule;
import cn.wjybxx.concurrent.EventLoopState;
import cn.wjybxx.concurrent.IAgentEvent;
import cn.wjybxx.concurrent.IFuture;
import cn.wjybxx.concurrent.IPromise;
import cn.wjybxx.concurrent.ITask;
import cn.wjybxx.concurrent.Promise;
import cn.wjybxx.concurrent.RejectedExecutionHandler;
import cn.wjybxx.concurrent.ScheduledPromiseTask;
import cn.wjybxx.concurrent.StartFailedException;
import cn.wjybxx.concurrent.TaskOption;
import cn.wjybxx.disruptor.AlertException;
import cn.wjybxx.disruptor.ConsumerBarrier;
import cn.wjybxx.disruptor.EventSequencer;
import cn.wjybxx.disruptor.EventTranslator;
import cn.wjybxx.disruptor.MpUnboundedEventSequencer;
import cn.wjybxx.disruptor.ProducerBarrier;
import cn.wjybxx.disruptor.Sequence;
import cn.wjybxx.disruptor.SequenceBarrier;
import cn.wjybxx.disruptor.WaitStrategy;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import javax.annotation.Nonnull;

public class DisruptorEventLoop<T extends IAgentEvent>
extends AbstractScheduledEventLoop {
    private static final int MIN_BATCH_SIZE = 64;
    private static final int MAX_BATCH_SIZE = 65536;
    private static final int BATCH_PUBLISH_THRESHOLD = 1023;
    private static final int HIGHER_PRIORITY_QUEUE_ID = 0;
    private static final int LOWER_PRIORITY_QUEUE_ID = 1;
    private long p1;
    private long p2;
    private long p3;
    private long p4;
    private long p5;
    private long p6;
    private long p7;
    private long p8;
    private volatile long nanoTime;
    private long p9;
    private long p10;
    private long p11;
    private long p12;
    private long p13;
    private long p14;
    private long p15;
    private long p16;
    private volatile int state = 0;
    private final EventSequencer<? extends T> eventSequencer;
    private final IndexedPriorityQueue<ScheduledPromiseTask<?>> scheduledTaskQueue;
    private final int batchSize;
    private final RejectedExecutionHandler rejectedExecutionHandler;
    private final EventLoopAgent<? super T> agent;
    private final EventLoopModule mainModule;
    private final boolean cleanBufferOnExit;
    private final MpUnboundedEventSequencer<?> mpUnboundedEventSequencer;
    private final Thread thread;
    private final Worker worker;
    private final IPromise<Void> terminationFuture = new Promise<Void>(this);
    private final IPromise<Void> runningFuture = new Promise<Void>(this);
    private static final VarHandle STATE;

    public DisruptorEventLoop(EventLoopBuilder.DisruptorBuilder<T> builder) {
        super(builder.getParent());
        MpUnboundedEventSequencer unboundedBuffer;
        EventSequencer<? extends T> eventSequencer;
        ThreadFactory threadFactory = Objects.requireNonNull(builder.getThreadFactory(), "threadFactory");
        this.nanoTime = System.nanoTime();
        this.eventSequencer = Objects.requireNonNull(builder.getEventSequencer());
        this.scheduledTaskQueue = new DefaultIndexedPriorityQueue(ScheduledPromiseTask::compareToExplicitly, 64);
        this.batchSize = MathCommon.clamp((int)builder.getBatchSize(), (int)64, (int)65536);
        this.rejectedExecutionHandler = Objects.requireNonNull(builder.getRejectedExecutionHandler());
        this.agent = Objects.requireNonNullElse(builder.getAgent(), EmptyAgent.getInstance());
        this.mainModule = builder.getMainModule();
        this.cleanBufferOnExit = builder.isCleanBufferOnExit();
        this.mpUnboundedEventSequencer = this.cleanBufferOnExit && (eventSequencer = this.eventSequencer) instanceof MpUnboundedEventSequencer ? (unboundedBuffer = (MpUnboundedEventSequencer)eventSequencer) : null;
        WaitStrategy waitStrategy = builder.getWaitStrategy();
        this.worker = waitStrategy == null ? new Worker(this.eventSequencer.newSingleConsumerBarrier(new SequenceBarrier[0])) : new Worker(this.eventSequencer.newSingleConsumerBarrier(waitStrategy, new SequenceBarrier[0]));
        this.thread = Objects.requireNonNull(threadFactory.newThread(this.worker), "newThread");
        DefaultThreadFactory.checkUncaughtExceptionHandler(this.thread);
        this.eventSequencer.addGatingBarriers(new SequenceBarrier[]{this.worker.barrier});
        this.agent.inject(this);
    }

    @Override
    public final EventLoopState state() {
        return EventLoopState.valueOf(this.state);
    }

    @Override
    public final boolean isRunning() {
        return this.state == 2;
    }

    @Override
    public final boolean isShuttingDown() {
        return this.state >= 3;
    }

    @Override
    public final boolean isShutdown() {
        return this.state >= 4;
    }

    @Override
    public final boolean isTerminated() {
        return this.state == 5;
    }

    @Override
    public final IFuture<?> terminationFuture() {
        return this.terminationFuture.asReadonly();
    }

    @Override
    public final boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
        return this.terminationFuture.await(timeout, unit);
    }

    @Override
    public final IFuture<?> runningFuture() {
        return this.runningFuture.asReadonly();
    }

    @Override
    public final boolean inEventLoop() {
        return this.thread == Thread.currentThread();
    }

    @Override
    public final boolean inEventLoop(Thread thread) {
        return this.thread == thread;
    }

    @Override
    public final void wakeup() {
        if (!this.inEventLoop() && this.thread.isAlive()) {
            this.thread.interrupt();
            this.agent.wakeup();
        }
    }

    public int taskCount() {
        long count = this.eventSequencer.producerBarrier().sequence() - this.worker.sequence.getVolatile();
        if (this.eventSequencer.capacity() != -1 && count >= (long)this.eventSequencer.capacity()) {
            return this.eventSequencer.capacity();
        }
        return Math.max(0, (int)count);
    }

    public ConsumerBarrier getBarrier() {
        return this.worker.barrier;
    }

    public EventSequencer<? extends T> getEventSequencer() {
        return this.eventSequencer;
    }

    public EventLoopAgent<? super T> getAgent() {
        return this.agent;
    }

    @Override
    public EventLoopModule mainModule() {
        return this.mainModule;
    }

    @Override
    public void execute(Runnable command) {
        int n;
        if (command instanceof ITask) {
            ITask task = (ITask)command;
            n = task.getOptions();
        } else {
            n = 0;
        }
        int options = n;
        this.execute(command, options);
    }

    @Override
    public void execute(Runnable command, int options) {
        Objects.requireNonNull(command, "command");
        if (this.isShuttingDown()) {
            this.rejectedExecutionHandler.rejected(command, this);
            return;
        }
        if (this.inEventLoop()) {
            long sequence = this.eventSequencer.tryNext(1);
            if (sequence == -1L) {
                this.rejectedExecutionHandler.rejected(command, this);
                return;
            }
            this.tryPublish(command, sequence, options);
        } else {
            this.tryPublish(command, this.eventSequencer.next(1), options);
        }
    }

    private void tryPublish(@Nonnull Runnable task, long sequence, int options) {
        if (this.isShuttingDown()) {
            this.eventSequencer.publish(sequence);
            this.rejectedExecutionHandler.rejected(task, this);
        } else {
            IAgentEvent event = (IAgentEvent)this.eventSequencer.producerGet(sequence);
            if (task instanceof EventTranslator) {
                try {
                    EventTranslator translator = (EventTranslator)task;
                    translator.translateTo((Object)event, sequence);
                }
                catch (Throwable ex) {
                    logger.warn("translateTo caught exception", ex);
                }
            } else {
                event.setType(0);
                event.setObj0(task);
                event.setOptions(options);
                if (task instanceof ScheduledPromiseTask) {
                    ScheduledPromiseTask futureTask = (ScheduledPromiseTask)task;
                    futureTask.setId(sequence);
                    futureTask.registerCancellation();
                }
            }
            this.eventSequencer.publish(sequence);
            if (!this.inEventLoop()) {
                if (sequence == 0L) {
                    this.ensureThreadStarted();
                } else if (TaskOption.isEnabled(options, 16384)) {
                    this.wakeup();
                }
            }
        }
    }

    public final T getEvent(long sequence) {
        DisruptorEventLoop.checkSequence(sequence);
        return (T)((IAgentEvent)this.eventSequencer.producerGet(sequence));
    }

    private static void checkSequence(long sequence) {
        if (sequence < 0L) {
            throw new IllegalArgumentException("invalid sequence " + sequence);
        }
    }

    public final long nextSequence() {
        return this.nextSequence(1);
    }

    public final void publish(long sequence) {
        DisruptorEventLoop.checkSequence(sequence);
        this.eventSequencer.publish(sequence);
        if (sequence == 0L && !this.inEventLoop()) {
            this.ensureThreadStarted();
        }
    }

    public final long nextSequence(int size) {
        long sequence;
        if (this.isShuttingDown()) {
            return -1L;
        }
        if (this.inEventLoop()) {
            sequence = this.eventSequencer.tryNext(size);
            if (sequence == -1L) {
                return -1L;
            }
        } else {
            sequence = this.eventSequencer.next(size);
        }
        if (this.isShuttingDown()) {
            long lo = sequence - (long)(size - 1);
            this.eventSequencer.publish(lo, sequence);
            return -1L;
        }
        return sequence;
    }

    public final void publish(long lo, long hi) {
        DisruptorEventLoop.checkSequence(lo);
        this.eventSequencer.producerBarrier().publish(lo, hi);
        if (lo == 0L && !this.inEventLoop()) {
            this.ensureThreadStarted();
        }
    }

    @Override
    final void reschedulePeriodic(ScheduledPromiseTask<?> futureTask, boolean triggered) {
        assert (this.inEventLoop());
        if (this.isShuttingDown()) {
            futureTask.cancelWithoutRemove();
            return;
        }
        this.scheduledTaskQueue.add(futureTask);
    }

    @Override
    final void removeScheduled(ScheduledPromiseTask<?> futureTask) {
        if (this.inEventLoop()) {
            this.scheduledTaskQueue.removeTyped(futureTask);
        } else if (this.mpUnboundedEventSequencer != null) {
            this.execute(() -> this.scheduledTaskQueue.removeTyped((IndexedElement)futureTask));
        }
    }

    @Override
    protected final long tickTime() {
        return this.nanoTime;
    }

    @Override
    public IFuture<?> start() {
        this.ensureThreadStarted();
        return this.runningFuture.asReadonly();
    }

    @Override
    public void shutdown() {
        if (!this.runningFuture.isDone()) {
            this.runningFuture.trySetException(new StartFailedException("Shutdown"));
        }
        int expectedState = this.state;
        while (expectedState < 3) {
            int realState = this.compareAndExchangeState(expectedState, 3);
            if (realState == expectedState) {
                this.ensureThreadTerminable(expectedState);
                return;
            }
            expectedState = realState;
        }
        return;
    }

    @Override
    @Nonnull
    public List<Runnable> shutdownNow() {
        this.shutdown();
        this.advanceRunState(4);
        return Collections.emptyList();
    }

    private void ensureThreadStarted() {
        if (this.state == 0 && STATE.compareAndSet(this, 0, 1)) {
            this.thread.start();
        }
    }

    private void ensureThreadTerminable(int oldState) {
        if (oldState == 0) {
            this.state = 5;
            this.worker.removeFromGatingBarriers();
            this.runningFuture.trySetException(new StartFailedException("Stillborn"));
            this.terminationFuture.trySetResult(null);
        } else {
            this.worker.barrier.alert();
            this.wakeup();
        }
    }

    private void advanceRunState(int targetState) {
        int expectedState = this.state;
        while (expectedState < targetState) {
            int realState = this.compareAndExchangeState(expectedState, targetState);
            if (realState >= targetState) {
                return;
            }
            expectedState = realState;
        }
        return;
    }

    private int compareAndExchangeState(int expectedState, int targetState) {
        return STATE.compareAndExchange(this, expectedState, targetState);
    }

    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            STATE = l.findVarHandle(DisruptorEventLoop.class, "state", Integer.TYPE);
        }
        catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
        Class<LockSupport> clazz = LockSupport.class;
    }

    private class Worker
    implements Runnable {
        private final ConsumerBarrier barrier;
        private final Sequence sequence;

        private Worker(ConsumerBarrier barrier) {
            this.barrier = barrier;
            this.sequence = barrier.groupSequence();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            try {
                if (!DisruptorEventLoop.this.runningFuture.trySetComputing()) {
                    return;
                }
                DisruptorEventLoop.this.nanoTime = System.nanoTime();
                DisruptorEventLoop.this.agent.onStart();
                DisruptorEventLoop.this.advanceRunState(2);
                if (!DisruptorEventLoop.this.runningFuture.trySetResult(null)) return;
                this.loop();
                return;
            }
            catch (Throwable e) {
                AbstractEventLoop.logger.error("thread exit due to exception!", e);
                if (DisruptorEventLoop.this.runningFuture.isDone()) return;
                DisruptorEventLoop.this.runningFuture.trySetException(new StartFailedException("StartFailed", e));
                return;
            }
            finally {
                if (DisruptorEventLoop.this.runningFuture.isSucceeded()) {
                    DisruptorEventLoop.this.advanceRunState(3);
                } else {
                    DisruptorEventLoop.this.advanceRunState(4);
                }
                try {
                    if (DisruptorEventLoop.this.cleanBufferOnExit) {
                        this.cleanBuffer();
                    }
                    DisruptorEventLoop.this.scheduledTaskQueue.clearIgnoringIndexes();
                }
                finally {
                    this.removeFromGatingBarriers();
                    DisruptorEventLoop.this.advanceRunState(4);
                    try {
                        DisruptorEventLoop.this.agent.onShutdown();
                    }
                    catch (Throwable e) {
                        AbstractEventLoop.logger.error("thread exit caught exception!", e);
                    }
                    finally {
                        DisruptorEventLoop.this.state = 5;
                        DisruptorEventLoop.this.terminationFuture.trySetResult(null);
                    }
                }
            }
        }

        private void loop() {
            ConsumerBarrier barrier = this.barrier;
            int taskBatchSize = DisruptorEventLoop.this.batchSize;
            MpUnboundedEventSequencer<?> mpUnboundedEventSequencer = DisruptorEventLoop.this.mpUnboundedEventSequencer;
            Sequence sequence = this.sequence;
            long nextSequence = sequence.getVolatile() + 1L;
            long availableSequence = -1L;
            while (!DisruptorEventLoop.this.isShuttingDown()) {
                try {
                    long batchEndSequence;
                    DisruptorEventLoop.this.nanoTime = System.nanoTime();
                    this.processScheduledQueue(DisruptorEventLoop.this.nanoTime, false);
                    if (availableSequence < nextSequence) {
                        availableSequence = barrier.waitFor(nextSequence);
                    }
                    if (nextSequence <= (batchEndSequence = Math.min(availableSequence, nextSequence + (long)taskBatchSize - 1L))) {
                        long curSequence = this.runTaskBatch(nextSequence, batchEndSequence);
                        sequence.setRelease(curSequence);
                        if (mpUnboundedEventSequencer != null) {
                            mpUnboundedEventSequencer.tryMoveHeadToNext(curSequence);
                        }
                        if ((nextSequence = curSequence + 1L) <= batchEndSequence) {
                            assert (DisruptorEventLoop.this.isShuttingDown());
                            break;
                        }
                    }
                    this.invokeAgentUpdate();
                }
                catch (TimeoutException e) {
                    if (DisruptorEventLoop.this.isShuttingDown()) break;
                    DisruptorEventLoop.this.nanoTime = System.nanoTime();
                    this.processScheduledQueue(DisruptorEventLoop.this.nanoTime, false);
                    this.invokeAgentUpdate();
                }
                catch (AlertException | InterruptedException e) {
                    if (DisruptorEventLoop.this.isShuttingDown()) break;
                    AbstractEventLoop.logger.warn("receive a confusing signal", e);
                }
                catch (Throwable e) {
                    AbstractEventLoop.logger.error("bad waitStrategy impl", e);
                }
            }
        }

        private void invokeAgentUpdate() {
            try {
                DisruptorEventLoop.this.agent.update();
            }
            catch (Throwable t) {
                if (t instanceof VirtualMachineError) {
                    AbstractEventLoop.logger.error("agent.update caught exception", t);
                }
                AbstractEventLoop.logger.warn("agent.update caught exception", t);
            }
        }

        private void processScheduledQueue(long tickTime, boolean shuttingDownMode) {
            ScheduledPromiseTask queueTask;
            DisruptorEventLoop eventLoop = DisruptorEventLoop.this;
            IndexedPriorityQueue<ScheduledPromiseTask<?>> taskQueue = eventLoop.scheduledTaskQueue;
            while ((queueTask = (ScheduledPromiseTask)taskQueue.peek()) != null) {
                if (queueTask.future().isDone()) {
                    taskQueue.poll();
                    continue;
                }
                if (tickTime < queueTask.getNextTriggerTime()) {
                    return;
                }
                if (DisruptorEventLoop.this.isShutdown()) {
                    return;
                }
                taskQueue.poll();
                if (shuttingDownMode) {
                    if (!queueTask.isTriggered() && !queueTask.trigger(tickTime)) continue;
                    queueTask.cancelWithoutRemove();
                    continue;
                }
                if (!queueTask.trigger(tickTime)) continue;
                taskQueue.offer((Object)queueTask);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private long runTaskBatch(long batchBeginSequence, long batchEndSequence) {
            EventSequencer eventSequencer = DisruptorEventLoop.this.eventSequencer;
            EventLoopAgent<IAgentEvent> agent = DisruptorEventLoop.this.agent;
            for (long curSequence = batchBeginSequence; curSequence <= batchEndSequence; ++curSequence) {
                IAgentEvent event = (IAgentEvent)eventSequencer.consumerGet(curSequence);
                try {
                    if (event.getType() > 0) {
                        agent.onEvent(event);
                        continue;
                    }
                    if (event.getType() == 0) {
                        Runnable runnable = (Runnable)event.getObj0();
                        runnable.run();
                        continue;
                    }
                    if (DisruptorEventLoop.this.isShuttingDown()) {
                        long runnable = curSequence;
                        return runnable;
                    }
                    AbstractEventLoop.logger.warn("user published invalid event: " + String.valueOf(event));
                    continue;
                }
                catch (Throwable t) {
                    AbstractEventLoop.logCause(t);
                    if (!DisruptorEventLoop.this.isShuttingDown()) continue;
                    long l = curSequence;
                    return l;
                }
                finally {
                    event.clean();
                }
            }
            return batchEndSequence;
        }

        private void removeFromGatingBarriers() {
            DisruptorEventLoop.this.eventSequencer.removeGatingBarrier((SequenceBarrier)this.barrier);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cleanBuffer() {
            long nextSequence;
            long startTimeMillis = System.currentTimeMillis();
            EventSequencer eventSequencer = DisruptorEventLoop.this.eventSequencer;
            EventLoopAgent<IAgentEvent> agent = DisruptorEventLoop.this.agent;
            DisruptorEventLoop.this.nanoTime = System.nanoTime();
            this.processScheduledQueue(DisruptorEventLoop.this.nanoTime, true);
            DisruptorEventLoop.this.scheduledTaskQueue.clearIgnoringIndexes();
            long nullCount = 0L;
            long taskCount = 0L;
            long discardCount = 0L;
            ProducerBarrier producerBarrier = eventSequencer.producerBarrier();
            Sequence sequence = this.sequence;
            while ((nextSequence = sequence.getVolatile() + 1L) <= producerBarrier.sequence()) {
                while (!producerBarrier.isPublished(nextSequence)) {
                    Thread.onSpinWait();
                }
                IAgentEvent event = (IAgentEvent)eventSequencer.consumerGet(nextSequence);
                try {
                    if (event.getType() < 0) {
                        ++nullCount;
                        continue;
                    }
                    ++taskCount;
                    if (DisruptorEventLoop.this.isShutdown()) {
                        ++discardCount;
                        event.cleanAll();
                        continue;
                    }
                    if (event.getType() > 0) {
                        agent.onEvent(event);
                        continue;
                    }
                    Runnable runnable = (Runnable)event.getObj0();
                    runnable.run();
                }
                catch (Throwable t) {
                    AbstractEventLoop.logCause(t);
                }
                finally {
                    event.cleanAll();
                    sequence.setRelease(nextSequence);
                }
            }
            if (DisruptorEventLoop.this.mpUnboundedEventSequencer != null) {
                DisruptorEventLoop.this.mpUnboundedEventSequencer.tryMoveHeadToNext(sequence.getVolatile());
            }
            AbstractEventLoop.logger.info("cleanBuffer success!  nullCount = {}, taskCount = {}, discardCount {}, cost timeMillis = {}", new Object[]{nullCount, taskCount, discardCount, System.currentTimeMillis() - startTimeMillis});
        }
    }
}

