/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.reactive;

import dk.cloudcreate.essentials.reactive.EventBus;
import dk.cloudcreate.essentials.reactive.EventHandler;
import dk.cloudcreate.essentials.reactive.OnErrorHandler;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public final class LocalEventBus
implements EventBus {
    public static final int DEFAULT_BACKPRESSURE_BUFFER_SIZE = 1024;
    public static final int DEFAULT_OVERFLOW_MAX_RETRIES = 20;
    public static final double QUEUED_TASK_CAP_FACTOR = 1.5;
    private final Logger log;
    private final String busName;
    private final Scheduler listenerScheduler;
    private final Flux<Object> eventFlux;
    private final ConcurrentMap<EventHandler, Disposable> asyncSubscribers;
    private final Set<EventHandler> syncSubscribers;
    private final OnErrorHandler onErrorHandler;
    private final int overflowMaxRetries;
    private final Sinks.Many<Object> eventSink;
    private boolean started;

    public static Builder builder() {
        return new Builder();
    }

    public LocalEventBus(String busName, int parallelThreads, int backpressureBufferSize, OnErrorHandler onErrorHandler, int overflowMaxRetries, double queuedTaskCapFactor) {
        this.busName = (String)FailFast.requireNonNull((Object)busName, (String)"busName was null");
        this.listenerScheduler = Schedulers.newBoundedElastic((int)parallelThreads, (int)((int)((double)backpressureBufferSize * queuedTaskCapFactor)), (String)busName, (int)60, (boolean)true);
        this.log = LoggerFactory.getLogger((String)("LocalEventBus - " + busName));
        this.onErrorHandler = (OnErrorHandler)FailFast.requireNonNull((Object)onErrorHandler, (String)"onErrorHandler is null");
        this.overflowMaxRetries = overflowMaxRetries;
        this.eventSink = Sinks.many().multicast().onBackpressureBuffer(backpressureBufferSize, false);
        this.eventFlux = this.eventSink.asFlux().doOnError(throwable -> this.log.error("Error in event stream", throwable));
        this.asyncSubscribers = new ConcurrentHashMap<EventHandler, Disposable>();
        this.syncSubscribers = new CopyOnWriteArraySet<EventHandler>();
        this.start();
    }

    public LocalEventBus(String busName) {
        this(busName, Runtime.getRuntime().availableProcessors(), 1024, (failingSubscriber, event, exception) -> {
            Logger log = LoggerFactory.getLogger((String)("LocalEventBus - " + busName));
            log.error(MessageFormatter.msg((String)"Error for '{}' handling {}", (Object[])new Object[]{failingSubscriber, event}), (Throwable)exception);
        }, 20, 1.5);
    }

    public LocalEventBus(String busName, OnErrorHandler onErrorHandler) {
        this(busName, Runtime.getRuntime().availableProcessors(), 1024, onErrorHandler, 20, 1.5);
    }

    public LocalEventBus(String busName, int parallelThreads, OnErrorHandler onErrorHandler) {
        this(busName, parallelThreads, 1024, onErrorHandler, 20, 1.5);
    }

    public LocalEventBus(String busName, int parallelThreads, int backpressureBufferSize) {
        this(busName, parallelThreads, backpressureBufferSize, (failingSubscriber, event, exception) -> {
            Logger log = LoggerFactory.getLogger((String)("LocalEventBus - " + busName));
            log.error(MessageFormatter.msg((String)"Error for '{}' handling {}", (Object[])new Object[]{failingSubscriber, event}), (Throwable)exception);
        }, 20, 1.5);
    }

    public String getName() {
        return this.busName;
    }

    @Override
    public EventBus publish(Object event) {
        FailFast.requireNonNull((Object)event, (String)"No event was supplied");
        this.log.trace("Publishing event of type '{}' to {} sync-subscriber(s)", (Object)event.getClass().getName(), (Object)this.syncSubscribers.size());
        this.syncSubscribers.forEach(subscriber -> {
            try {
                subscriber.handle(event);
            }
            catch (Exception e) {
                this.log.error(MessageFormatter.msg((String)"Subscriber '{}' failed with exception {}", (Object[])new Object[]{subscriber, Exceptions.getStackTrace((Throwable)e)}), (Throwable)e);
                throw e;
            }
        });
        this.log.trace("Publishing event of type '{}' to {} async-subscriber(s)", (Object)event.getClass().getName(), (Object)this.asyncSubscribers.size());
        if (!this.asyncSubscribers.isEmpty()) {
            this.emit(event);
        }
        return this;
    }

    private void emit(Object event) {
        if (this.eventSink.currentSubscriberCount() == 0) {
            this.log.debug("No subscribers are active. Skipping event emission.");
            return;
        }
        Sinks.EmitResult emitResult = this.eventSink.tryEmitNext(event);
        if (emitResult.isFailure()) {
            this.handleEmitFailure(emitResult, event, 1);
        }
    }

    private void handleEmitFailure(Sinks.EmitResult emitResult, Object event, int attempt) {
        switch (emitResult) {
            case FAIL_NON_SERIALIZED: {
                this.log.debug("Non-serialized access detected when emitting event '{}'. Retrying emission (attempt {}/{}).", new Object[]{event, attempt, this.overflowMaxRetries});
                this.retryEmitWithBackoff(event, attempt, emitResult);
                break;
            }
            case FAIL_OVERFLOW: {
                this.log.debug("Buffer overflow when emitting event '{}'. Retrying emission (attempt {}/{}).", new Object[]{event, attempt, this.overflowMaxRetries});
                this.retryEmitWithBackoff(event, attempt, emitResult);
                break;
            }
            case FAIL_ZERO_SUBSCRIBER: {
                this.log.debug("No subscribers are available to receive event '{}'. Discarding the event.", event);
                break;
            }
            case FAIL_TERMINATED: {
                this.log.debug("Cannot emit event '{}' because the sink is terminated. Discarding the event.", event);
                break;
            }
            case FAIL_CANCELLED: {
                this.log.debug("Cannot emit event '{}' because the sink is cancelled. Discarding the event.", event);
                break;
            }
            default: {
                this.log.error("Failed to emit event '{}' due to '{}'. Notifying error handler.", event, (Object)emitResult);
                this.onErrorHandler.handle(null, event, new RuntimeException("Failed to emit event due to " + emitResult));
            }
        }
    }

    private void retryEmitWithBackoff(Object event, int attempt, Sinks.EmitResult emitResult) {
        if (emitResult != Sinks.EmitResult.FAIL_NON_SERIALIZED && attempt > this.overflowMaxRetries) {
            this.log.error("Failed to emit event '{}' after {} attempts. Notifying error handler.", event, (Object)this.overflowMaxRetries);
            this.onErrorHandler.handle(null, event, this.convertToException(event, attempt, emitResult));
            return;
        }
        long delay = Math.min(100L * (1L << attempt - 1), 1000L);
        this.log.debug("Retrying emission of event '{}' in {} ms (attempt {}/{})", new Object[]{event, delay, attempt, this.overflowMaxRetries});
        LockSupport.parkNanos(delay * 1000000L);
        Sinks.EmitResult retryResult = this.eventSink.tryEmitNext(event);
        if (retryResult.isSuccess()) {
            this.log.debug("Successfully re-emitted event '{}' on attempt {}/{}", new Object[]{event, attempt, this.overflowMaxRetries});
        } else {
            this.handleEmitFailure(retryResult, event, attempt + 1);
        }
    }

    private RuntimeException convertToException(Object event, int attempt, Sinks.EmitResult emitResult) {
        switch (emitResult) {
            case FAIL_OVERFLOW: {
                return new EventPublishOverflowException(MessageFormatter.msg((String)"Overflow: Failed to emit event after {} attempts", (Object[])new Object[]{this.overflowMaxRetries}));
            }
        }
        return new RuntimeException(MessageFormatter.msg((String)"Failed to emit event after {} attempts", (Object[])new Object[]{this.overflowMaxRetries}));
    }

    @Override
    public EventBus addAsyncSubscriber(EventHandler subscriber) {
        FailFast.requireNonNull((Object)subscriber, (String)"You must supply a subscriber instance");
        this.log.info("[{}] Adding asynchronous subscriber {}", (Object)this.busName, (Object)subscriber);
        this.asyncSubscribers.computeIfAbsent(subscriber, busEventSubscriber -> this.eventFlux.publishOn(this.listenerScheduler).flatMap(event -> Mono.fromRunnable(() -> subscriber.handle(event)).onErrorResume(throwable -> {
            try {
                this.onErrorHandler.handle(subscriber, event, (Exception)throwable);
            }
            catch (Exception ex) {
                this.log.error(MessageFormatter.msg((String)"onErrorHandler failed to handle subscriber {} failing to handle exception {}", (Object[])new Object[]{subscriber, Exceptions.getStackTrace((Throwable)throwable)}), (Throwable)ex);
            }
            return Mono.empty();
        }), 1).subscribe());
        return this;
    }

    @Override
    public EventBus removeAsyncSubscriber(EventHandler subscriber) {
        FailFast.requireNonNull((Object)subscriber, (String)"You must supply a subscriber instance");
        this.log.info("[{}] Removing asynchronous subscriber {}", (Object)this.busName, (Object)subscriber);
        Disposable processorSubscription = (Disposable)this.asyncSubscribers.remove(subscriber);
        if (processorSubscription != null) {
            processorSubscription.dispose();
        }
        return this;
    }

    @Override
    public EventBus addSyncSubscriber(EventHandler subscriber) {
        FailFast.requireNonNull((Object)subscriber, (String)"You must supply a subscriber instance");
        this.log.info("[{}] Adding synchronous subscriber {}", (Object)this.busName, (Object)subscriber);
        this.syncSubscribers.add(subscriber);
        return this;
    }

    @Override
    public EventBus removeSyncSubscriber(EventHandler subscriber) {
        FailFast.requireNonNull((Object)subscriber, (String)"You must supply a subscriber instance");
        this.log.info("[{}] Removing synchronous subscriber {}", (Object)this.busName, (Object)subscriber);
        this.syncSubscribers.remove(subscriber);
        return this;
    }

    @Override
    public boolean hasSyncSubscriber(EventHandler subscriber) {
        return this.syncSubscribers.contains(subscriber);
    }

    @Override
    public boolean hasAsyncSubscriber(EventHandler subscriber) {
        return this.asyncSubscribers.containsKey(subscriber);
    }

    public String toString() {
        return "LocalEventBus - " + this.busName;
    }

    public void start() {
        if (!this.started) {
            this.started = true;
            this.log.info("Started event bus");
        }
    }

    public void stop() {
        if (this.started) {
            this.log.info("Stopping event bus");
            this.eventSink.emitComplete((signalType, emitResult) -> {
                this.log.error(MessageFormatter.msg((String)"Failed to complete eventSink: {}", (Object[])new Object[]{emitResult}));
                return false;
            });
            this.listenerScheduler.dispose();
            this.started = false;
            this.log.info("Stopped event bus");
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    public static class Builder {
        private String busName = "default";
        private int parallelThreads = Runtime.getRuntime().availableProcessors();
        private int backpressureBufferSize = 1024;
        private int overflowMaxRetries = 20;
        private double queuedTaskCapFactor = 1.5;
        private OnErrorHandler onErrorHandler = (failingSubscriber, event, exception) -> {
            Logger log = LoggerFactory.getLogger((String)("LocalEventBus - " + this.busName));
            log.error(MessageFormatter.msg((String)"Error for '{}' handling {}", (Object[])new Object[]{failingSubscriber, event}), (Throwable)exception);
        };

        public Builder busName(String busName) {
            this.busName = busName;
            return this;
        }

        public Builder parallelThreads(int parallelThreads) {
            this.parallelThreads = parallelThreads;
            return this;
        }

        public Builder backpressureBufferSize(int backpressureBufferSize) {
            this.backpressureBufferSize = backpressureBufferSize;
            return this;
        }

        public Builder overflowMaxRetries(int maxRetries) {
            this.overflowMaxRetries = maxRetries;
            return this;
        }

        public Builder queuedTaskCapFactor(double queuedTaskCapFactor) {
            this.queuedTaskCapFactor = queuedTaskCapFactor;
            return this;
        }

        public Builder onErrorHandler(OnErrorHandler onErrorHandler) {
            this.onErrorHandler = onErrorHandler;
            return this;
        }

        public LocalEventBus build() {
            return new LocalEventBus(this.busName, this.parallelThreads, this.backpressureBufferSize, this.onErrorHandler, this.overflowMaxRetries, this.queuedTaskCapFactor);
        }
    }

    public static class EventPublishOverflowException
    extends RuntimeException {
        public EventPublishOverflowException(String msg) {
            super(msg);
        }
    }
}

