/*
 * Decompiled with CFR 0.152.
 */
package org.epics.ca.impl.monitor.disruptor;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jcip.annotations.ThreadSafe;
import org.apache.commons.lang3.Validate;
import org.epics.ca.impl.TypeSupports;
import org.epics.ca.impl.monitor.MonitorNotificationService;
import org.epics.ca.util.Holder;

@ThreadSafe
public class DisruptorNewMonitorNotificationService<T>
implements MonitorNotificationService<T> {
    private static final Logger logger = Logger.getLogger(DisruptorNewMonitorNotificationService.class.getName());
    private static final int NOTIFICATION_VALUE_BUFFER_SIZE = 2;
    private final Disruptor<Holder<T>> disruptor;
    private final MySpecialEventProducer<T> producer;
    private T deserializedValue;

    DisruptorNewMonitorNotificationService(Consumer<? super T> consumer) {
        Validate.notNull(consumer);
        MyThreadFactory myThreadFactory = new MyThreadFactory();
        this.disruptor = new Disruptor<Holder>(Holder::new, 2, myThreadFactory);
        MySpecialEventHandler<? super T> eventHandler = new MySpecialEventHandler<T>(consumer);
        this.disruptor.handleEventsWith(eventHandler);
        this.producer = new MySpecialEventProducer<T>(this.disruptor.getRingBuffer());
        this.deserializedValue = null;
    }

    @Override
    public boolean publish(ByteBuffer dataBuffer, TypeSupports.TypeSupport<T> typeSupport, int dataCount) {
        Validate.notNull(dataBuffer);
        Validate.notNull(typeSupport);
        this.deserializedValue = typeSupport.deserialize(dataBuffer, this.deserializedValue, dataCount);
        return this.publish(this.deserializedValue);
    }

    @Override
    public boolean publish(T value) {
        return this.producer.publish(value);
    }

    @Override
    public void init() {
        this.disruptor.start();
    }

    @Override
    public void close() {
        int shutdownDelayInMilliseconds = 2000;
        boolean useHaltImplementation = true;
        this.disruptor.halt();
    }

    static class MySpecialEventHandler<T>
    implements EventHandler<Holder<T>>,
    LifecycleAware {
        private final Consumer<? super T> consumer;

        MySpecialEventHandler(Consumer<? super T> consumer) {
            this.consumer = consumer;
        }

        @Override
        public void onEvent(Holder<T> event, long sequence, boolean endOfBatch) {
            logger.log(Level.FINEST, String.format("MySpecialEventHandler: Consuming Event - START. Sequence Number is: %d, Value is: %s ", sequence, event.value));
            this.consumer.accept(event.value);
            logger.log(Level.FINEST, String.format("MySpecialEventHandler: Consuming Event - FINISH. Sequence Number is: %d, Value was: %s ", sequence, event.value));
        }

        @Override
        public void onStart() {
            logger.log(Level.FINEST, "MySpecialEventHandler started on Thread: " + Thread.currentThread() + "... ");
        }

        @Override
        public void onShutdown() {
            logger.log(Level.FINEST, "MySpecialEventHandler shutdown on Thread: " + Thread.currentThread() + "... ");
        }
    }

    static class MySpecialEventProducer<T> {
        private final RingBuffer<Holder<T>> ringBuffer;

        MySpecialEventProducer(RingBuffer<Holder<T>> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean publish(T value) {
            if (this.ringBuffer.hasAvailableCapacity(1)) {
                long nextSequence = this.ringBuffer.next();
                try {
                    Holder<T> nextEventHolder = this.ringBuffer.get(nextSequence);
                    nextEventHolder.value = value;
                }
                finally {
                    this.ringBuffer.publish(nextSequence);
                }
                return true;
            }
            long oldestSequence = this.ringBuffer.getCursor();
            Holder<T> oldestEventHolder = this.ringBuffer.get(oldestSequence);
            oldestEventHolder.value = value;
            return false;
        }
    }

    static class MyThreadFactory
    implements ThreadFactory {
        private static int id = 1;

        MyThreadFactory() {
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "DisruptorMonitorNotificationServiceThread-" + String.valueOf(id++));
        }
    }
}

