/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.reactive;

import dk.cloudcreate.essentials.reactive.OnErrorHandler;
import dk.cloudcreate.essentials.reactive.PublishException;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class LocalEventBus<EVENT_TYPE> {
    private final Logger log;
    private final String busName;
    private final Scheduler listenerScheduler;
    private final Flux<EVENT_TYPE> eventFlux;
    private final Sinks.Many<EVENT_TYPE> eventSink;
    private final ConcurrentMap<Consumer<EVENT_TYPE>, Disposable> asyncSubscribers;
    private final Set<Consumer<EVENT_TYPE>> syncSubscribers;
    private final OnErrorHandler<EVENT_TYPE> onErrorHandler;

    public LocalEventBus(String busName, int parallelThreads, OnErrorHandler<EVENT_TYPE> onErrorHandler) {
        this(busName, Schedulers.newBoundedElastic((int)parallelThreads, (int)1000, (String)((String)FailFast.requireNonNull((Object)busName, (String)"busName was null")), (int)60, (boolean)true), onErrorHandler);
    }

    public LocalEventBus(String busName, Scheduler asyncSubscribersScheduler, OnErrorHandler<EVENT_TYPE> onErrorHandler) {
        this.busName = (String)FailFast.requireNonNull((Object)busName, (String)"busName was null");
        this.listenerScheduler = (Scheduler)FailFast.requireNonNull((Object)asyncSubscribersScheduler, (String)"asyncSubscribersScheduler is null");
        this.log = LoggerFactory.getLogger((String)("LocalEventBus - " + busName));
        this.onErrorHandler = (OnErrorHandler)FailFast.requireNonNull(onErrorHandler, (String)"onErrorHandler is null");
        this.eventSink = Sinks.many().multicast().onBackpressureBuffer();
        this.eventFlux = this.eventSink.asFlux().publishOn(this.listenerScheduler);
        this.asyncSubscribers = new ConcurrentHashMap<Consumer<EVENT_TYPE>, Disposable>();
        this.syncSubscribers = ConcurrentHashMap.newKeySet();
    }

    public LocalEventBus<EVENT_TYPE> publish(EVENT_TYPE event) {
        FailFast.requireNonNull(event, (String)"No event was supplied");
        this.log.trace("Publishing {} to {} async-subscribers", (Object)event.getClass().getName(), (Object)this.asyncSubscribers.size());
        Sinks.EmitResult emitResult = this.eventSink.tryEmitNext(event);
        if (emitResult.isFailure()) {
            throw new PublishException(MessageFormatter.msg((String)"Failed to publish event {} to async-subscribers: {}", (Object[])new Object[]{event, emitResult}), event);
        }
        this.log.trace("Publishing {} to {} sync-subscribers", (Object)event.getClass().getName(), (Object)this.syncSubscribers.size());
        this.syncSubscribers.forEach(subscriber -> {
            try {
                subscriber.accept(event);
            }
            catch (Exception e) {
                try {
                    this.onErrorHandler.handle((Consumer<Object>)subscriber, event, e);
                }
                catch (Exception ex) {
                    this.log.error(MessageFormatter.msg((String)"onErrorHandler failed to handle subscriber {} failing to handle exception {}", (Object[])new Object[]{subscriber, Exceptions.getStackTrace((Throwable)e)}), (Throwable)ex);
                }
            }
        });
        return this;
    }

    public LocalEventBus<EVENT_TYPE> addAsyncSubscriber(Consumer<EVENT_TYPE> subscriber) {
        FailFast.requireNonNull(subscriber, (String)"You must supply a subscriber instance");
        this.asyncSubscribers.computeIfAbsent(subscriber, busEventSubscriber -> this.eventFlux.subscribe(event -> {
            try {
                subscriber.accept(event);
            }
            catch (Exception e) {
                try {
                    this.onErrorHandler.handle(subscriber, event, e);
                }
                catch (Exception ex) {
                    this.log.error(MessageFormatter.msg((String)"onErrorHandler failed to handle subscriber {} failing to handle exception {}", (Object[])new Object[]{subscriber, Exceptions.getStackTrace((Throwable)e)}), (Throwable)ex);
                }
            }
        }));
        return this;
    }

    public LocalEventBus<EVENT_TYPE> removeAsyncSubscriber(Consumer<EVENT_TYPE> subscriber) {
        FailFast.requireNonNull(subscriber, (String)"You must supply a subscriber instance");
        Disposable processorSubscription = (Disposable)this.asyncSubscribers.remove(subscriber);
        if (processorSubscription != null) {
            processorSubscription.dispose();
        }
        return this;
    }

    public LocalEventBus<EVENT_TYPE> addSyncSubscriber(Consumer<EVENT_TYPE> subscriber) {
        FailFast.requireNonNull(subscriber, (String)"You must supply a subscriber instance");
        this.syncSubscribers.add(subscriber);
        return this;
    }

    public LocalEventBus<EVENT_TYPE> removeSyncSubscriber(Consumer<EVENT_TYPE> subscriber) {
        FailFast.requireNonNull(subscriber, (String)"You must supply a subscriber instance");
        this.syncSubscribers.remove(subscriber);
        return this;
    }

    public String toString() {
        return "LocalEventBus - " + this.busName;
    }
}

