/*
 * Decompiled with CFR 0.152.
 */
package org.epics.ca.impl.monitor;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Stream;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.StopWatch;
import org.epics.ca.NotificationConsumer;
import org.epics.ca.ThreadWatcher;
import org.epics.ca.impl.monitor.MonitorNotificationService;
import org.epics.ca.impl.monitor.MonitorNotificationServiceFactory;
import org.epics.ca.impl.monitor.MonitorNotificationServiceFactoryCreator;
import org.epics.ca.util.logging.LibraryLogManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

public class MonitorNotificationServiceTest {
    private static final Logger logger = LibraryLogManager.getLogger(MonitorNotificationServiceTest.class);
    private ThreadWatcher threadWatcher;

    @BeforeEach
    void beforeEach() {
        this.threadWatcher = ThreadWatcher.start();
    }

    @AfterEach
    void afterEach() {
        this.threadWatcher.verify();
    }

    @ParameterizedTest
    @MethodSource(value={"getMonitorNotificationServiceImplementations"})
    void testGetNotifierForConsumer_ThrowsNullPointerExceptionWhenConsumerNull(String serviceImpl) {
        try (MonitorNotificationServiceFactory factory = MonitorNotificationServiceFactoryCreator.create((String)serviceImpl);){
            Assertions.assertThrows(NullPointerException.class, () -> factory.getServiceForConsumer(null));
        }
    }

    @ParameterizedTest
    @MethodSource(value={"getArgumentsForTestNotifyConsumerNullPublicationBehaviour"})
    void testNotifyConsumerNullPublicationBehaviour(String serviceImpl, boolean acceptsNullExpectation) {
        boolean nullAcceptanceWasAdvertised;
        Validate.notNull((Object)serviceImpl);
        logger.info(String.format("Assessing null publication behaviour of service implementation: '%s'", serviceImpl));
        boolean nullWasAccepted = true;
        try (MonitorNotificationServiceFactory factory = MonitorNotificationServiceFactoryCreator.create((String)serviceImpl);){
            nullAcceptanceWasAdvertised = factory.getQosMetricIsNullPublishable();
            NotificationConsumer consumer = NotificationConsumer.getNormalConsumer();
            MonitorNotificationService notifier = factory.getServiceForConsumer(consumer);
            try {
                notifier.publish(null);
            }
            catch (NullPointerException ex) {
                nullWasAccepted = false;
            }
        }
        Assertions.assertEquals((Object)acceptsNullExpectation, (Object)nullWasAccepted);
        Assertions.assertEquals((Object)acceptsNullExpectation, (Object)nullAcceptanceWasAdvertised);
        logger.info(String.format("The service implementation '%s' had the following null publication property: [advertises null acceptance = %b; accepts null=%b].\n", serviceImpl, nullAcceptanceWasAdvertised, nullWasAccepted));
    }

    @ParameterizedTest
    @MethodSource(value={"getArgumentsForTestThroughputUntilLastValueReceived"})
    public <T> void testThroughputUntilLastValueReceived(String serviceImpl, int notifications, T notifyValue, T lastNotifyValue, NotificationConsumer.ConsumerType consumerType, int consumerProcessingTimeInMicroseconds) {
        Validate.notNull((Object)serviceImpl);
        Validate.notNull(notifyValue);
        Validate.notNull(lastNotifyValue);
        Validate.notNull((Object)((Object)consumerType));
        String notificationValueType = notifyValue.getClass().getName();
        logger.info(String.format("Starting test with service implementation '%s', '%s' notifications, and notification value type: '%s'", serviceImpl, notifications, notificationValueType));
        Assertions.assertTimeout((Duration)Duration.ofSeconds(30L), () -> {
            long elapsedTimeInMicroseconds;
            try (MonitorNotificationServiceFactory factory = MonitorNotificationServiceFactoryCreator.create((String)serviceImpl);){
                NotificationConsumer<Object> notificationConsumer = new NotificationConsumer<Object>(consumerType, consumerProcessingTimeInMicroseconds, TimeUnit.MICROSECONDS);
                MonitorNotificationService notifier = factory.getServiceForConsumer(notificationConsumer);
                StopWatch stopWatch = StopWatch.createStarted();
                for (long notification = 0L; notification < (long)(notifications - 1); ++notification) {
                    if (notifier.publish(notifyValue)) {
                        logger.finest(String.format("Value %s accepted - buffer OK", notifyValue));
                        continue;
                    }
                    logger.finest(String.format("Value %s accepted - buffer OVERRUN", notifyValue));
                }
                notificationConsumer.setExpectedNotificationValue(lastNotifyValue);
                if (notifier.publish(lastNotifyValue)) {
                    logger.finest(String.format("Value %s accepted - buffer OK", lastNotifyValue));
                } else {
                    logger.warning(String.format("Value %s accepted - buffer OVERRUN - oldest value DISCARDED", lastNotifyValue));
                }
                notificationConsumer.awaitExpectedNotificationValue();
                elapsedTimeInMicroseconds = stopWatch.getTime(TimeUnit.MICROSECONDS);
            }
            double averageNotificationTimeInMicroseconds = (double)elapsedTimeInMicroseconds / (double)notifications;
            double elapsedTimeInMilliseconds = (double)elapsedTimeInMicroseconds / 1000.0;
            double throughput = 1000000.0 / averageNotificationTimeInMicroseconds;
            logger.info(String.format("Time to send '%s' notification to a SINGLE Consumer was: %,.3f ms ", notifications, elapsedTimeInMilliseconds));
            logger.info(String.format("Average notification time was: %,.3f us ", averageNotificationTimeInMicroseconds));
            logger.info(String.format("Throughput was: %,.0f notifications per second.\n", throughput));
        });
    }

    @ParameterizedTest
    @MethodSource(value={"getArgumentsForTestThroughputUntilExpectedNotificationCountReceived"})
    public <T> void testThroughputUntilExpectedNotificationCountReceived(String serviceImpl, int notifications, T notifyValue, NotificationConsumer.ConsumerType consumerType, int consumerProcessingTimeInMicroseconds) {
        Validate.notNull((Object)serviceImpl);
        Validate.notNull(notifyValue);
        Validate.notNull((Object)((Object)consumerType));
        String notificationValueType = notifyValue.getClass().getSimpleName();
        logger.info(String.format("Starting test with service implementation '%s', '%s' notifications, and notification value type: '%s'", serviceImpl, notifications, notificationValueType));
        Assertions.assertTimeout((Duration)Duration.ofSeconds(120L), () -> {
            long elapsedTimeInMicroseconds;
            int numberOfNotificationThreadsPerConsumerAsAdvertised;
            try (MonitorNotificationServiceFactory factory = MonitorNotificationServiceFactoryCreator.create((String)serviceImpl);){
                numberOfNotificationThreadsPerConsumerAsAdvertised = factory.getQosMetricNumberOfNotificationThreadsPerConsumer();
                NotificationConsumer.clearCurrentTotalNotificationCount();
                NotificationConsumer.setExpectedTotalNotificationCount(notifications);
                logger.finest(String.format("Sending '%s' notifications with value: '%s' ", notifications, notifyValue));
                StopWatch stopWatch = StopWatch.createStarted();
                for (long notification = 0L; notification < (long)notifications; ++notification) {
                    NotificationConsumer notificationConsumer = new NotificationConsumer(consumerType, consumerProcessingTimeInMicroseconds, TimeUnit.MICROSECONDS);
                    MonitorNotificationService notifier = factory.getServiceForConsumer(notificationConsumer);
                    if (notifier.publish(notifyValue)) continue;
                    logger.info(String.format("Value was dropped: %s", notifyValue));
                }
                NotificationConsumer.awaitExpectedTotalNotificationCount();
                elapsedTimeInMicroseconds = stopWatch.getTime(TimeUnit.MICROSECONDS);
                Assertions.assertEquals((long)notifications, (long)NotificationConsumer.getCurrentTotalNotificationCount(), (String)"unexpected number of notifications received");
            }
            double averageNotificationTimeInMicroseconds = (double)elapsedTimeInMicroseconds / (double)notifications;
            double elapsedTimeInMilliseconds = (double)elapsedTimeInMicroseconds / 1000.0;
            double throughput = 1000000.0 / averageNotificationTimeInMicroseconds;
            double theoreticalThroughputLimit = 1000000.0 / ((double)consumerProcessingTimeInMicroseconds / (double)numberOfNotificationThreadsPerConsumerAsAdvertised);
            int cpuCores = Runtime.getRuntime().availableProcessors();
            logger.finest(String.format("CPU Cores are %d", cpuCores));
            int availableThreads = Math.min(cpuCores, numberOfNotificationThreadsPerConsumerAsAdvertised);
            logger.finest(String.format("Available threads are %d", availableThreads));
            double cpuBoundedThroughputLimitOnThisMachine = 1000000.0 / ((double)consumerProcessingTimeInMicroseconds / (double)availableThreads);
            NotificationConsumer exampleConsumer = NotificationConsumer.getBusyWaitingSlowConsumer(consumerProcessingTimeInMicroseconds, TimeUnit.MICROSECONDS);
            logger.info(String.format("Service Implementation '%s' took '%,.3f' ms to send %,d notifications of value type '%s' to DISTINCT consumers of type '%s'", serviceImpl, elapsedTimeInMilliseconds, notifications, notificationValueType, exampleConsumer));
            logger.info(String.format("Average notification time was: '%,.3f' us ", averageNotificationTimeInMicroseconds));
            logger.info(String.format("Throughput was: '%,.0f' notifications per second.", throughput));
            logger.info(String.format("Theoretical limits were: theoretical / cpuBoundOnThisMachine '%,.0f' / '%,.0f' notifications per second.\n", theoreticalThroughputLimit, cpuBoundedThroughputLimitOnThisMachine));
        });
    }

    @ParameterizedTest
    @MethodSource(value={"getArgumentsForTestSlowConsumerBlockingBehaviour"})
    void testSlowConsumerBlockingBehaviour(String serviceImpl, long slowConsumerDelayTimeInMillis, long slowConsumerMinNotifyTimeInMillis, long slowConsumerMaxNotifyTimeInMillis, long normalConsumerMinNotifyTimeInMillis, long normalConsumerMaxNotifyTimeInMillis) {
        Validate.notNull((Object)serviceImpl);
        logger.info(String.format("Starting test with service implementation '%s', delay='%d', minNotify='%d', maxNotify='%d'", serviceImpl, slowConsumerDelayTimeInMillis, normalConsumerMinNotifyTimeInMillis, normalConsumerMaxNotifyTimeInMillis));
        Assertions.assertTimeout((Duration)Duration.ofSeconds(10L), () -> {
            long slowConsumerNotifyTimeInMilliseconds;
            long normalConsumerNotifyTimeInMilliseconds;
            Long slowConsumerValue = 123L;
            Long normalConsumerValue = 456L;
            try (MonitorNotificationServiceFactory factory = MonitorNotificationServiceFactoryCreator.create((String)serviceImpl);){
                NotificationConsumer<Long> slowConsumer = NotificationConsumer.getThreadSleepingSlowConsumer(slowConsumerDelayTimeInMillis, TimeUnit.MILLISECONDS);
                NotificationConsumer<Long> normalConsumer = NotificationConsumer.getNormalConsumer();
                MonitorNotificationService slowConsumerNotifier = factory.getServiceForConsumer(slowConsumer);
                MonitorNotificationService normalConsumerNotifier = factory.getServiceForConsumer(normalConsumer);
                slowConsumer.setExpectedNotificationCount(1);
                slowConsumer.setExpectedNotificationValue(slowConsumerValue);
                normalConsumer.setExpectedNotificationCount(1);
                normalConsumer.setExpectedNotificationValue(normalConsumerValue);
                StopWatch stopWatch = StopWatch.createStarted();
                if (!slowConsumerNotifier.publish((Object)slowConsumerValue)) {
                    logger.info(String.format("Value was dropped: %s", slowConsumerValue));
                }
                if (!normalConsumerNotifier.publish((Object)normalConsumerValue)) {
                    logger.info(String.format("Value was dropped: %s", normalConsumerValue));
                }
                normalConsumer.awaitExpectedNotificationCount();
                normalConsumer.awaitExpectedNotificationValue();
                normalConsumerNotifyTimeInMilliseconds = stopWatch.getTime(TimeUnit.MILLISECONDS);
                slowConsumer.awaitExpectedNotificationCount();
                slowConsumer.awaitExpectedNotificationValue();
                slowConsumerNotifyTimeInMilliseconds = stopWatch.getTime(TimeUnit.MILLISECONDS);
            }
            logger.info(String.format("Slow consumer was notified in %,d ms.", slowConsumerNotifyTimeInMilliseconds));
            logger.info(String.format("Normal consumer was notified in %,d ms.", normalConsumerNotifyTimeInMilliseconds));
            Assertions.assertTrue((normalConsumerNotifyTimeInMilliseconds >= normalConsumerMinNotifyTimeInMillis ? 1 : 0) != 0, (String)String.format("Normal Consumer: notify time: '%s' greater than '%s'", normalConsumerNotifyTimeInMilliseconds, normalConsumerMinNotifyTimeInMillis));
            Assertions.assertTrue((normalConsumerNotifyTimeInMilliseconds < normalConsumerMaxNotifyTimeInMillis ? 1 : 0) != 0, (String)String.format("Normal Consumer:notify time: '%s' less than '%s'", normalConsumerNotifyTimeInMilliseconds, normalConsumerMaxNotifyTimeInMillis));
            Assertions.assertTrue((slowConsumerNotifyTimeInMilliseconds >= slowConsumerMinNotifyTimeInMillis ? 1 : 0) != 0, (String)String.format("Slow Consumer: notify time: '%s' greater than '%s'", slowConsumerNotifyTimeInMilliseconds, slowConsumerMinNotifyTimeInMillis));
            Assertions.assertTrue((slowConsumerNotifyTimeInMilliseconds < slowConsumerMaxNotifyTimeInMillis ? 1 : 0) != 0, (String)String.format("Slow Consumer:notify time: '%s' less than '%s'", slowConsumerNotifyTimeInMilliseconds, slowConsumerMaxNotifyTimeInMillis));
            boolean serviceIsBlocking = normalConsumerNotifyTimeInMilliseconds >= slowConsumerDelayTimeInMillis;
            logger.info(String.format("The service implementation '%s' had the following notification properties: [blocking=%b].\n", serviceImpl, serviceIsBlocking));
        });
    }

    @ParameterizedTest
    @MethodSource(value={"getTestConsumerDeliverySequenceArgs"})
    void testConsumerDeliverySequence(String serviceImpl, long lastNotificationValue, boolean expectedMonotonic, boolean expectedConsecutive) {
        Validate.notNull((Object)serviceImpl);
        logger.info(String.format("Starting test with service implementation '%s', lastNotificationValue '%,d'", serviceImpl, lastNotificationValue));
        Assertions.assertTimeout((Duration)Duration.ofSeconds(30L), () -> {
            NotificationConsumer<Long> notificationConsumer = NotificationConsumer.getNormalConsumer();
            try (MonitorNotificationServiceFactory factory = MonitorNotificationServiceFactoryCreator.create((String)serviceImpl);){
                MonitorNotificationService consumerNotifier = factory.getServiceForConsumer(notificationConsumer);
                notificationConsumer.setExpectedNotificationValue(lastNotificationValue);
                notificationConsumer.setNotificationSequenceWasMonotonic();
                notificationConsumer.setNotificationSequenceWasConsecutive();
                for (long i = 1L; i <= lastNotificationValue; ++i) {
                    logger.finest("Publishing: " + i);
                    if (consumerNotifier.publish((Object)i)) continue;
                    logger.finest("Value was dropped: " + i);
                }
                notificationConsumer.awaitExpectedNotificationValue();
            }
            boolean serviceIsMonotonic = notificationConsumer.getNotificationSequenceWasMonotonic();
            boolean serviceIsConsecutive = notificationConsumer.getNotificationSequenceWasConsecutive();
            if (expectedMonotonic) {
                Assertions.assertTrue((boolean)serviceIsMonotonic, (String)"monotonic expectation not met");
            }
            if (expectedConsecutive) {
                Assertions.assertTrue((boolean)serviceIsConsecutive, (String)"consecutive expectation not met");
            }
            logger.info(String.format("The service implementation '%s' had the following notification properties: [monotonic=%b, consecutive=%b].", serviceImpl, serviceIsMonotonic, serviceIsConsecutive));
        });
    }

    @ParameterizedTest
    @ValueSource(strings={"BlockingQueueSingleWorkerMonitorNotificationServiceImpl,1,2", "BlockingQueueMultipleWorkerMonitorNotificationServiceImpl,4,2", "StripedExecutorServiceMonitorNotificationServiceImpl"})
    void testBufferOverrunConsumerLastValueAlwaysGetsSent(String serviceImpl) {
        logger.info(String.format("Starting test with service implementation '%s'", serviceImpl));
        Assertions.assertTimeout((Duration)Duration.ofSeconds(30L), () -> {
            NotificationConsumer<Long> notificationConsumer = NotificationConsumer.getBusyWaitingSlowConsumer(1L, TimeUnit.SECONDS);
            try (MonitorNotificationServiceFactory factory = MonitorNotificationServiceFactoryCreator.create((String)serviceImpl);){
                MonitorNotificationService consumerNotifier = factory.getServiceForConsumer(notificationConsumer);
                for (long i = 1L; i <= 1000L; ++i) {
                    notificationConsumer.setExpectedNotificationValue(i);
                    logger.info(String.format("Publishing: %d", i));
                    if (!consumerNotifier.publish((Object)i)) {
                        logger.info(String.format(" Value %s accepted - buffer OVERRUN", i));
                        notificationConsumer.awaitExpectedNotificationValue();
                        break;
                    }
                    logger.finest(String.format(" Value %s accepted - buffer OK", i));
                }
            }
        });
    }

    private static Stream<Arguments> getMonitorNotificationServiceImplementations() {
        List allConfigurations = MonitorNotificationServiceFactoryCreator.getAllServiceImplementations();
        return allConfigurations.stream().map(xva$0 -> Arguments.of((Object[])new Object[]{xva$0}));
    }

    private static Stream<Arguments> getArgumentsForTestThroughputUntilLastValueReceived() {
        String aStr = "This is really quite a long string that goes on and on for several tens of characters";
        Integer[] arry = new Integer[10];
        List<String> allServiceImpls = Arrays.asList("BlockingQueueSingleWorkerMonitorNotificationServiceImpl", "BlockingQueueMultipleWorkerMonitorNotificationServiceImpl", "StripedExecutorServiceMonitorNotificationServiceImpl");
        return Stream.of(allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 10000, 1234, 9999, NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100})), allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 10000, 567L, 9999, NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100})), allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 10000, Float.valueOf(8.9f), 9999, NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100})), allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 10000, 1.2, 9999, NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100})), allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 10000, "This is really quite a long string that goes on and on for several tens of characters", 9999, NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100})), allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 10000, arry, 9999, NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100}))).flatMap(s -> s);
    }

    private static Stream<Arguments> getArgumentsForTestThroughputUntilExpectedNotificationCountReceived() {
        List<String> allServiceImpls = Arrays.asList("BlockingQueueSingleWorkerMonitorNotificationServiceImpl", "BlockingQueueMultipleWorkerMonitorNotificationServiceImpl", "StripedExecutorServiceMonitorNotificationServiceImpl");
        String aStr = "This is really quite a long string that goes on and on for several tens of characters";
        Integer[] arry = new Integer[1000];
        for (int i = 0; i < 1000; ++i) {
            arry[i] = i;
        }
        return Stream.of(allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 1000, 1234, NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100})), allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 1000, 567L, NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100})), allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 1000, Float.valueOf(8.9f), NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100})), allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 1000, 1.2, NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100})), allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 1000, "This is really quite a long string that goes on and on for several tens of characters", NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100})), allServiceImpls.stream().map(s -> Arguments.of((Object[])new Object[]{s, 1000, arry, NotificationConsumer.ConsumerType.SLOW_WITH_BUSY_WAIT, 100}))).flatMap(s -> s);
    }

    private static Stream<Arguments> getArgumentsForTestNotifyConsumerNullPublicationBehaviour() {
        return Stream.of(Arguments.of((Object[])new Object[]{"BlockingQueueSingleWorkerMonitorNotificationServiceImpl", false}), Arguments.of((Object[])new Object[]{"BlockingQueueMultipleWorkerMonitorNotificationServiceImpl", false}), Arguments.of((Object[])new Object[]{"StripedExecutorServiceMonitorNotificationServiceImpl", false}));
    }

    private static Stream<Arguments> getTestConsumerDeliverySequenceArgs() {
        return Stream.of(Arguments.of((Object[])new Object[]{"BlockingQueueSingleWorkerMonitorNotificationServiceImpl", 100000L, true, true}), Arguments.of((Object[])new Object[]{"BlockingQueueMultipleWorkerMonitorNotificationServiceImpl", 100000L, true, true}), Arguments.of((Object[])new Object[]{"StripedExecutorServiceMonitorNotificationServiceImpl", 100000L, true, true}));
    }

    private static Stream<Arguments> getArgumentsForTestSlowConsumerBlockingBehaviour() {
        return Stream.of(Arguments.of((Object[])new Object[]{"BlockingQueueSingleWorkerMonitorNotificationServiceImpl", 500, 400, 800, 400, 800}), Arguments.of((Object[])new Object[]{"BlockingQueueMultipleWorkerMonitorNotificationServiceImpl", 500, 400, 800, 0, 200}), Arguments.of((Object[])new Object[]{"StripedExecutorServiceMonitorNotificationServiceImpl", 500, 400, 800, 0, 200}));
    }
}

