/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.reactive;

import dk.cloudcreate.essentials.reactive.EventHandler;
import dk.cloudcreate.essentials.reactive.OnErrorHandler;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ParallelFlux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class LocalEventBus<EVENT_TYPE> {
    private final Logger log;
    private final String busName;
    private final Scheduler listenerScheduler;
    private final ParallelFlux<EVENT_TYPE> eventFlux;
    private final Sinks.Many<EVENT_TYPE> eventSink;
    private final ConcurrentMap<EventHandler<EVENT_TYPE>, Disposable> asyncSubscribers;
    private final Set<EventHandler<EVENT_TYPE>> syncSubscribers;
    private final OnErrorHandler<EVENT_TYPE> onErrorHandler;

    public LocalEventBus(String busName, OnErrorHandler<EVENT_TYPE> onErrorHandler) {
        this(busName, Schedulers.newBoundedElastic((int)Runtime.getRuntime().availableProcessors(), (int)1000, (String)((String)FailFast.requireNonNull((Object)busName, (String)"busName was null")), (int)60, (boolean)true), onErrorHandler);
    }

    public LocalEventBus(String busName, int parallelThreads, OnErrorHandler<EVENT_TYPE> onErrorHandler) {
        this(busName, Schedulers.newBoundedElastic((int)parallelThreads, (int)1000, (String)((String)FailFast.requireNonNull((Object)busName, (String)"busName was null")), (int)60, (boolean)true), onErrorHandler);
    }

    public LocalEventBus(String busName, Scheduler asyncSubscribersScheduler, OnErrorHandler<EVENT_TYPE> onErrorHandler) {
        this.busName = (String)FailFast.requireNonNull((Object)busName, (String)"busName was null");
        this.listenerScheduler = (Scheduler)FailFast.requireNonNull((Object)asyncSubscribersScheduler, (String)"asyncSubscribersScheduler is null");
        this.log = LoggerFactory.getLogger((String)("LocalEventBus - " + busName));
        this.onErrorHandler = (OnErrorHandler)FailFast.requireNonNull(onErrorHandler, (String)"onErrorHandler is null");
        this.eventSink = Sinks.many().multicast().onBackpressureBuffer();
        this.eventFlux = this.eventSink.asFlux().parallel().runOn(this.listenerScheduler);
        this.asyncSubscribers = new ConcurrentHashMap<EventHandler<EVENT_TYPE>, Disposable>();
        this.syncSubscribers = ConcurrentHashMap.newKeySet();
    }

    public LocalEventBus<EVENT_TYPE> publish(EVENT_TYPE event) {
        FailFast.requireNonNull(event, (String)"No event was supplied");
        this.log.debug("Publishing event of type '{}' to {} async-subscriber(s)", (Object)event.getClass().getName(), (Object)this.asyncSubscribers.size());
        if (this.asyncSubscribers.size() > 0) {
            this.eventSink.emitNext(event, (signalType, emitResult) -> {
                if (Sinks.EmitResult.FAIL_NON_SERIALIZED == emitResult) {
                    LockSupport.parkNanos(100L);
                    return true;
                }
                if (emitResult.isFailure()) {
                    this.log.error("Failed to publish event of type '{}' to {} async-subscriber(s): {}", new Object[]{event.getClass().getName(), this.asyncSubscribers.size(), emitResult});
                    this.onErrorHandler.handle(null, event, null);
                }
                return false;
            });
        }
        this.log.debug("Publishing event of type '{}' to {} sync-subscriber(s)", (Object)event.getClass().getName(), (Object)this.syncSubscribers.size());
        this.syncSubscribers.forEach(subscriber -> {
            try {
                subscriber.handle(event);
            }
            catch (Exception e) {
                try {
                    this.onErrorHandler.handle((EventHandler<Object>)subscriber, event, e);
                }
                catch (Exception ex) {
                    this.log.error(MessageFormatter.msg((String)"onErrorHandler failed to handle subscriber {} failing to handle exception {}", (Object[])new Object[]{subscriber, Exceptions.getStackTrace((Throwable)e)}), (Throwable)ex);
                }
            }
        });
        return this;
    }

    public LocalEventBus<EVENT_TYPE> addAsyncSubscriber(EventHandler<EVENT_TYPE> subscriber) {
        FailFast.requireNonNull(subscriber, (String)"You must supply a subscriber instance");
        this.log.info("[{}] Adding asynchronous subscriber {}", (Object)this.busName, subscriber);
        this.asyncSubscribers.computeIfAbsent(subscriber, busEventSubscriber -> this.eventFlux.subscribe(event -> {
            try {
                subscriber.handle(event);
            }
            catch (Exception e) {
                try {
                    this.onErrorHandler.handle(subscriber, event, e);
                }
                catch (Exception ex) {
                    this.log.error(MessageFormatter.msg((String)"onErrorHandler failed to handle subscriber {} failing to handle exception {}", (Object[])new Object[]{subscriber, Exceptions.getStackTrace((Throwable)e)}), (Throwable)ex);
                }
            }
        }));
        return this;
    }

    public LocalEventBus<EVENT_TYPE> removeAsyncSubscriber(EventHandler<EVENT_TYPE> subscriber) {
        FailFast.requireNonNull(subscriber, (String)"You must supply a subscriber instance");
        this.log.info("[{}] Removing asynchronous subscriber {}", (Object)this.busName, subscriber);
        Disposable processorSubscription = (Disposable)this.asyncSubscribers.remove(subscriber);
        if (processorSubscription != null) {
            processorSubscription.dispose();
        }
        return this;
    }

    public LocalEventBus<EVENT_TYPE> addSyncSubscriber(EventHandler<EVENT_TYPE> subscriber) {
        FailFast.requireNonNull(subscriber, (String)"You must supply a subscriber instance");
        this.log.info("[{}] Adding synchronous subscriber {}", (Object)this.busName, subscriber);
        this.syncSubscribers.add(subscriber);
        return this;
    }

    public LocalEventBus<EVENT_TYPE> removeSyncSubscriber(EventHandler<EVENT_TYPE> subscriber) {
        FailFast.requireNonNull(subscriber, (String)"You must supply a subscriber instance");
        this.log.info("[{}] Removing synchronous subscriber {}", (Object)this.busName, subscriber);
        this.syncSubscribers.remove(subscriber);
        return this;
    }

    public boolean hasSyncSubscriber(EventHandler<EVENT_TYPE> subscriber) {
        return this.syncSubscribers.contains(subscriber);
    }

    public boolean hasAsyncSubscriber(EventHandler<EVENT_TYPE> subscriber) {
        return this.asyncSubscribers.containsKey(subscriber);
    }

    public String toString() {
        return "LocalEventBus - " + this.busName;
    }
}

