/*
 * Decompiled with CFR 0.152.
 */
package org.epics.ca.impl.monitor.disruptor;

import com.lmax.disruptor.DataProvider;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventProcessorFactory;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jcip.annotations.ThreadSafe;
import org.apache.commons.lang3.Validate;
import org.epics.ca.impl.TypeSupports;
import org.epics.ca.impl.monitor.MonitorNotificationService;
import org.epics.ca.impl.monitor.disruptor.ConnectionInterruptable;
import org.epics.ca.impl.monitor.disruptor.MonitorBatchEventProcessor;
import org.epics.ca.util.Holder;

@ThreadSafe
public class DisruptorOldMonitorNotificationService<T>
implements MonitorNotificationService<T> {
    private static final Logger logger = Logger.getLogger(DisruptorOldMonitorNotificationService.class.getName());
    private static final int NOTIFICATION_VALUE_BUFFER_SIZE = 2;
    private final Disruptor<Holder<T>> disruptor;
    private T overrunValue;
    private Holder<T> lastValue;

    DisruptorOldMonitorNotificationService(Consumer<? super T> consumer) {
        Validate.notNull(consumer);
        MyThreadFactory myThreadFactory = new MyThreadFactory();
        this.disruptor = new Disruptor(Holder::new, 2, (ThreadFactory)myThreadFactory);
        this.disruptor.handleEventsWith(new EventProcessorFactory[]{(ringBuffer, barrierSequences) -> new MonitorBatchEventProcessor<Holder>(new MyAlwaysOnlineConnectionInterruptable(), new Holder(), value -> value.value == null, (DataProvider<Holder>)this.disruptor.getRingBuffer(), ringBuffer.newBarrier(barrierSequences), (EventHandler<Holder>)((EventHandler)(e, s, eob) -> new MySpecialEventHandler(consumer).onEvent(e, s, eob)))});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean publish(ByteBuffer dataBuffer, TypeSupports.TypeSupport<T> typeSupport, int dataCount) {
        Validate.notNull((Object)dataBuffer);
        Validate.notNull(typeSupport);
        RingBuffer ringBuffer = this.disruptor.getRingBuffer();
        if (ringBuffer.hasAvailableCapacity(1)) {
            long next = ringBuffer.next();
            try {
                this.lastValue = (Holder)ringBuffer.get(next);
                this.lastValue.value = typeSupport.deserialize(dataBuffer, this.lastValue.value, dataCount);
            }
            finally {
                ringBuffer.publish(next);
            }
            return true;
        }
        this.overrunValue = typeSupport.deserialize(dataBuffer, this.overrunValue, dataCount);
        Object tmp = this.lastValue.value;
        this.lastValue.value = this.overrunValue;
        this.overrunValue = tmp;
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean publish(T value) {
        RingBuffer ringBuffer = this.disruptor.getRingBuffer();
        if (ringBuffer.hasAvailableCapacity(1)) {
            long nextSequence = ringBuffer.next();
            try {
                Holder nextEventHolder = (Holder)ringBuffer.get(nextSequence);
                nextEventHolder.value = value;
            }
            finally {
                ringBuffer.publish(nextSequence);
            }
            return true;
        }
        long oldestSequence = ringBuffer.getCursor();
        Holder oldestEventHolder = (Holder)ringBuffer.get(oldestSequence);
        oldestEventHolder.value = value;
        return false;
    }

    @Override
    public void init() {
        this.disruptor.start();
    }

    @Override
    public void close() {
        int shutdownDelayInMilliseconds = 2000;
        boolean useHaltImplementation = true;
        this.disruptor.halt();
    }

    static class MyAlwaysOnlineConnectionInterruptable
    implements ConnectionInterruptable {
        MyAlwaysOnlineConnectionInterruptable() {
        }

        @Override
        public int getConnectionLossId() {
            return 0;
        }
    }

    static class MySpecialEventHandler<T>
    implements EventHandler<Holder<? extends T>>,
    LifecycleAware {
        private final Consumer<? super T> consumer;

        MySpecialEventHandler(Consumer<? super T> consumer) {
            this.consumer = consumer;
        }

        public void onEvent(Holder<? extends T> event, long sequence, boolean endOfBatch) {
            logger.log(Level.FINEST, "MySpecialEventHandler is digesting Event " + event.value + " on Thread: " + Thread.currentThread() + "... ");
            this.consumer.accept(event.value);
        }

        public void onStart() {
            logger.log(Level.FINEST, "MySpecialEventHandler started on Thread: " + Thread.currentThread() + "... ");
        }

        public void onShutdown() {
            logger.log(Level.FINEST, "MySpecialEventHandler shutdown on Thread: " + Thread.currentThread() + "... ");
        }
    }

    static class MyThreadFactory
    implements ThreadFactory {
        private static int id = 1;

        MyThreadFactory() {
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "DisruptorMonitorNotificationServiceThread-" + String.valueOf(id++));
        }
    }
}

