/*
 * Decompiled with CFR 0.152.
 */
package org.evrete.util;

import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.evrete.api.events.Events;
import org.evrete.util.CommonUtils;
import org.evrete.util.Hierarchy;

public class BroadcastingPublisher<E extends Events.Event>
implements Events.Publisher<E> {
    private final Hierarchy<SplitSubscriptions<E>> innerSubscriptions;
    private final Executor executor;

    private BroadcastingPublisher(Hierarchy<SplitSubscriptions<E>> innerSubscriptions, Executor executor) {
        this.innerSubscriptions = innerSubscriptions;
        this.executor = executor;
    }

    public BroadcastingPublisher(Executor executor) {
        this(new Hierarchy<SplitSubscriptions<E>>(new SplitSubscriptions()), executor);
    }

    public BroadcastingPublisher(BroadcastingPublisher<E> other) {
        this(new Hierarchy<SplitSubscriptions<E>>(new SplitSubscriptions(), other.innerSubscriptions), other.executor);
    }

    public int totalLocalSubscriptions() {
        return this.innerSubscriptions.getValue().size();
    }

    @Override
    public synchronized Events.Subscription subscribe(boolean async, Consumer<E> listener) {
        InnerSubscription<E> result = new InnerSubscription<E>(this, async, listener);
        this.innerSubscriptions.getValue().add(result);
        return result;
    }

    public void broadcast(E event) {
        this.innerSubscriptions.walkUp(subscriptions -> subscriptions.broadcast(event, this.executor));
    }

    private void removeSubscription(InnerSubscription<E> subscription) {
        this.innerSubscriptions.getValue().removeSubscription(subscription);
    }

    private static class SplitSubscriptions<H extends Events.Event> {
        private final Collection<InnerSubscription<H>> subscriptionsSync = CommonUtils.newIdentityHashSet();
        private final Collection<InnerSubscription<H>> subscriptionsAsync = CommonUtils.newIdentityHashSet();

        private SplitSubscriptions() {
        }

        void add(InnerSubscription<H> subscription) {
            if (subscription.async) {
                this.subscriptionsAsync.add(subscription);
            } else {
                this.subscriptionsSync.add(subscription);
            }
        }

        private void removeSubscription(InnerSubscription<H> subscription) {
            if (subscription.async) {
                this.subscriptionsAsync.remove(subscription);
            } else {
                this.subscriptionsSync.remove(subscription);
            }
        }

        int size() {
            return this.subscriptionsSync.size() + this.subscriptionsAsync.size();
        }

        void broadcast(H event, Executor executor) {
            for (InnerSubscription<H> s : this.subscriptionsAsync) {
                if (s.cancelled) continue;
                executor.execute(() -> s.action.accept(event));
            }
            for (InnerSubscription<H> s : this.subscriptionsSync) {
                if (s.cancelled) continue;
                s.action.accept(event);
            }
        }
    }

    private static class InnerSubscription<P extends Events.Event>
    implements Events.Subscription {
        private final BroadcastingPublisher<P> publisher;
        private final boolean async;
        private final Consumer<P> action;
        private boolean cancelled = false;

        InnerSubscription(BroadcastingPublisher<P> publisher, boolean async, Consumer<P> action) {
            this.publisher = publisher;
            this.async = async;
            this.action = action;
        }

        @Override
        public synchronized void cancel() {
            this.cancelled = true;
            this.publisher.removeSubscription(this);
        }
    }
}

