/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.subscription;

import com.google.common.collect.Maps;
import de.otto.synapse.channel.selector.Selector;
import de.otto.synapse.endpoint.BestMatchingSelectableComparator;
import de.otto.synapse.endpoint.MessageInterceptorRegistration;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.sender.MessageSenderEndpoint;
import de.otto.synapse.endpoint.sender.MessageSenderEndpointFactory;
import de.otto.synapse.message.Message;
import de.otto.synapse.subscription.SnapshotProvider;
import de.otto.synapse.subscription.Subscription;
import de.otto.synapse.subscription.SubscriptionInterceptor;
import de.otto.synapse.subscription.Subscriptions;
import de.otto.synapse.subscription.events.SubscriptionCreated;
import de.otto.synapse.subscription.events.SubscriptionUpdated;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Service;

@Service
@ConditionalOnBean(value={MessageSenderEndpoint.class, SnapshotProvider.class})
public class SubscriptionService {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionService.class);
    private final MessageInterceptorRegistry registry;
    private final List<MessageSenderEndpointFactory> senderEndpointFactories;
    private final Map<String, SnapshotProvider> snapshotProviders;
    private final ConcurrentMap<String, MessageSenderEndpoint> targetSenders = new ConcurrentHashMap<String, MessageSenderEndpoint>();
    private final Subscriptions subscriptions = new Subscriptions();

    public SubscriptionService(MessageInterceptorRegistry registry, List<MessageSenderEndpointFactory> senderEndpointFactories, List<SnapshotProvider> snapshotProviders) {
        LOG.info("Initializing SubscriptionService for " + snapshotProviders.stream().map(SnapshotProvider::channelName).collect(Collectors.toList()));
        this.registry = registry;
        this.senderEndpointFactories = senderEndpointFactories;
        this.snapshotProviders = Maps.uniqueIndex(snapshotProviders, SnapshotProvider::channelName);
    }

    public void onSubscriptionCreated(SubscriptionCreated subscriptionCreated, Class<? extends Selector> sourceChannelSelector) {
        try {
            Class<? extends Selector> targetChannelSelector = sourceChannelSelector;
            SnapshotProvider snapshotProvider = this.snapshotProviders.get(subscriptionCreated.getSubscribedChannel());
            if (snapshotProvider == null) {
                throw new IllegalArgumentException("No SnapshopProvider configured for channel " + subscriptionCreated.getSubscribedChannel());
            }
            Subscription subscription = new Subscription(subscriptionCreated.getId(), subscriptionCreated.getSubscribedChannel(), subscriptionCreated.getResponseChannel());
            this.subscriptions.addIfMissing(subscription);
            this.targetSenders.computeIfAbsent(subscriptionCreated.getResponseChannel(), channelName -> {
                MessageSenderEndpointFactory senderEndpointFactory = this.senderEndpointFactories.stream().filter(candiate -> candiate.matches(targetChannelSelector)).min(new BestMatchingSelectableComparator(targetChannelSelector)).orElseThrow(() -> new IllegalArgumentException("Unable to subscribe to " + subscriptionCreated.getSubscribedChannel() + " because no matching sender factory was found."));
                return senderEndpointFactory.create(subscriptionCreated.getResponseChannel());
            });
            MessageSenderEndpoint targetSender = (MessageSenderEndpoint)this.targetSenders.get(subscriptionCreated.getResponseChannel());
            SubscriptionInterceptor subscriptionInterceptor = new SubscriptionInterceptor(subscription, targetSender);
            this.registry.register(MessageInterceptorRegistration.matchingSenderChannelsWith(subscription.getChannelName(), subscriptionInterceptor));
        }
        catch (IllegalArgumentException e) {
            LOG.error("unable to add a subscription to channel " + subscriptionCreated.getSubscribedChannel() + ": " + e.getMessage());
            throw e;
        }
    }

    public void onSubscriptionUpdated(SubscriptionUpdated event) {
        Subscription subscription = this.subscriptions.get(event.getId()).orElseThrow(() -> new IllegalArgumentException("Subscription " + event.getId() + " does not exist"));
        subscription.subscribe(event.getSubscribedEntities());
        subscription.unsubscribe(event.getUnsubscribedEntities());
        SnapshotProvider snapshotProvider = this.snapshotProviders.get(subscription.getChannelName());
        this.sendSnapshot(subscription.getSubscribedEntities(), subscription.getTargetChannelName(), snapshotProvider);
    }

    public void onSubscriptionDeleted(String subscriptionId) {
        this.subscriptions.remove(subscriptionId);
    }

    public Subscriptions getSubscriptions() {
        return this.subscriptions;
    }

    private void sendSnapshot(Set<String> entityIds, String targetChannel, SnapshotProvider snapshotProvider) {
        MessageSenderEndpoint messageSenderEndpoint = (MessageSenderEndpoint)this.targetSenders.get(targetChannel);
        entityIds.forEach(id -> {
            Stream<? extends Message<?>> snapshot = snapshotProvider.snapshot((String)id);
            messageSenderEndpoint.sendBatch(snapshot).join();
        });
    }
}

