/*
 * Decompiled with CFR 0.152.
 */
package org.coodex.concrete.message;

import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.coodex.concrete.common.ConcreteHelper;
import org.coodex.concrete.common.IF;
import org.coodex.concrete.message.AbstractTopic;
import org.coodex.concrete.message.AggregatedCourier;
import org.coodex.concrete.message.Courier;
import org.coodex.concrete.message.CourierPrototype;
import org.coodex.concrete.message.MessageFilter;
import org.coodex.concrete.message.Observer;
import org.coodex.concrete.message.Subscription;
import org.coodex.util.Common;
import org.coodex.util.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTopicPrototype<M extends Serializable>
implements AbstractTopic<M> {
    private static final Logger log = LoggerFactory.getLogger(AbstractTopicPrototype.class);
    private static final Singleton<Executor> POOL_SINGLETON = Singleton.with(() -> ConcreteHelper.getExecutor("topic"));
    private final Courier<?> courier;
    private final Map<Observer<M>, SubscriptionImpl> subscriptions = new ConcurrentHashMap<Observer<M>, SubscriptionImpl>();

    public AbstractTopicPrototype(Courier<M> courier) {
        this.courier = courier;
        IF.isNull(courier, "courier MUST NOT null.").associate(this);
    }

    protected Set<Observer<M>> getObservers() {
        return Collections.unmodifiableSet(this.subscriptions.keySet());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void remove(Observer<M> observer) {
        if (!this.subscriptions.containsKey(observer)) {
            return;
        }
        Map<Observer<M>, SubscriptionImpl> map = this.subscriptions;
        synchronized (map) {
            this.subscriptions.remove(observer);
            if (this.subscriptions.size() == 0) {
                this.courier.setConsumer(false);
            }
        }
    }

    protected String getQueue() {
        return this.courier instanceof CourierPrototype ? ((CourierPrototype)this.courier).getQueue() : null;
    }

    protected Courier<?> getCourier() {
        return this.courier;
    }

    protected Executor getExecutor() {
        return (Executor)POOL_SINGLETON.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Subscription subscribe(Observer<M> observer) {
        if (this.getCourier() instanceof AggregatedCourier) {
            throw new RuntimeException("Aggregated queue could not subscribe message.");
        }
        if (!this.subscriptions.containsKey(observer)) {
            Map<Observer<M>, SubscriptionImpl> map = this.subscriptions;
            synchronized (map) {
                if (!this.subscriptions.containsKey(observer)) {
                    this.subscriptions.put(observer, new SubscriptionImpl(observer));
                    if (!this.courier.isConsumer()) {
                        this.courier.setConsumer(true);
                    }
                }
            }
        }
        return this.subscriptions.get(observer);
    }

    public void notify(M message) {
        Set<Observer<M>> observers = this.getObservers();
        for (Observer observer : observers) {
            MessageFilter finalFilter = observer instanceof MessageFilter ? (MessageFilter)Common.cast(observer) : null;
            this.getExecutor().execute(() -> {
                try {
                    if (finalFilter == null || finalFilter.handle(message)) {
                        observer.update(message);
                    }
                }
                catch (Throwable throwable) {
                    log.warn("message update failed.", throwable);
                }
            });
        }
    }

    private class SubscriptionImpl
    implements Subscription {
        private final Observer<M> observer;

        private SubscriptionImpl(Observer<M> observer) {
            this.observer = observer;
        }

        public void cancel() {
            AbstractTopicPrototype.this.remove(this.observer);
        }
    }
}

