/*
 * Decompiled with CFR 0.152.
 */
package org.occurrent.subscription.blocking.durable.catchup;

import io.cloudevents.CloudEvent;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import org.occurrent.condition.Condition;
import org.occurrent.eventstore.api.blocking.EventStoreQueries;
import org.occurrent.filter.Filter;
import org.occurrent.functionalsupport.internal.FunctionalSupport;
import org.occurrent.subscription.OccurrentSubscriptionFilter;
import org.occurrent.subscription.StartAt;
import org.occurrent.subscription.StringBasedSubscriptionPosition;
import org.occurrent.subscription.SubscriptionFilter;
import org.occurrent.subscription.SubscriptionPosition;
import org.occurrent.subscription.api.blocking.DelegatingSubscriptionModel;
import org.occurrent.subscription.api.blocking.PositionAwareSubscriptionModel;
import org.occurrent.subscription.api.blocking.Subscription;
import org.occurrent.subscription.api.blocking.SubscriptionModel;
import org.occurrent.subscription.api.blocking.SubscriptionModelCancelSubscription;
import org.occurrent.subscription.blocking.durable.catchup.CatchupSubscriptionModelConfig;
import org.occurrent.subscription.blocking.durable.catchup.SubscriptionPositionStorageConfig;
import org.occurrent.subscription.blocking.durable.catchup.TimeBasedSubscriptionPosition;
import org.occurrent.time.internal.RFC3339;

public class CatchupSubscriptionModel
implements SubscriptionModel,
DelegatingSubscriptionModel,
SubscriptionModelCancelSubscription {
    private static final int DEFAULT_CACHE_SIZE = 100;
    private final PositionAwareSubscriptionModel subscriptionModel;
    private final EventStoreQueries eventStoreQueries;
    private final CatchupSubscriptionModelConfig config;
    private final ConcurrentMap<String, Boolean> runningCatchupSubscriptions = new ConcurrentHashMap<String, Boolean>();
    private volatile boolean shuttingDown = false;

    public CatchupSubscriptionModel(PositionAwareSubscriptionModel subscriptionModel, EventStoreQueries eventStoreQueries) {
        this(subscriptionModel, eventStoreQueries, new CatchupSubscriptionModelConfig(100));
    }

    public CatchupSubscriptionModel(PositionAwareSubscriptionModel subscriptionModel, EventStoreQueries eventStoreQueries, CatchupSubscriptionModelConfig config) {
        this.subscriptionModel = subscriptionModel;
        this.eventStoreQueries = eventStoreQueries;
        this.config = config;
    }

    public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, Supplier<StartAt> startAtSupplier, Consumer<CloudEvent> action) {
        Subscription subscription;
        boolean subscriptionsWasCancelledOrShutdown;
        Stream stream;
        Filter timeFilter;
        if (filter != null && !(filter instanceof OccurrentSubscriptionFilter)) {
            throw new IllegalArgumentException("Unsupported!");
        }
        this.runningCatchupSubscriptions.put(subscriptionId, true);
        StartAt startAt = startAtSupplier == null || startAtSupplier.get() == null ? StartAt.subscriptionPosition((SubscriptionPosition)TimeBasedSubscriptionPosition.beginningOfTime()) : startAtSupplier.get();
        if (!CatchupSubscriptionModel.isTimeBasedSubscriptionPosition(startAt)) {
            return this.subscriptionModel.subscribe(subscriptionId, filter, startAtSupplier, action);
        }
        SubscriptionPosition subscriptionPosition = ((StartAt.StartAtSubscriptionPosition)startAt).subscriptionPosition;
        if (CatchupSubscriptionModel.isBeginningOfTime(subscriptionPosition)) {
            timeFilter = Filter.all();
        } else {
            OffsetDateTime offsetDateTime = OffsetDateTime.parse(subscriptionPosition.asString(), RFC3339.RFC_3339_DATE_TIME_FORMATTER);
            timeFilter = Filter.time((Condition)Condition.gt((Object)offsetDateTime));
        }
        SubscriptionPosition globalSubscriptionPosition = this.subscriptionModel.globalSubscriptionPosition();
        FixedSizeCache cache = new FixedSizeCache(this.config.cacheSize);
        if (filter == null) {
            stream = this.eventStoreQueries.query(timeFilter, EventStoreQueries.SortBy.TIME_ASC);
        } else {
            Filter userSuppliedFilter = ((OccurrentSubscriptionFilter)filter).filter;
            stream = this.eventStoreQueries.query(timeFilter.and(userSuppliedFilter, new Filter[0]), EventStoreQueries.SortBy.TIME_ASC);
        }
        FunctionalSupport.takeWhile((Stream)stream, __ -> !this.shuttingDown && this.runningCatchupSubscriptions.containsKey(subscriptionId)).peek(action).peek(e -> cache.put(e.getId())).filter(this.returnIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.PersistSubscriptionPositionDuringCatchupPhase.class, cfg -> cfg.persistCloudEventPositionPredicate).orElse(__ -> false)).forEach(e -> this.doIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.PersistSubscriptionPositionDuringCatchupPhase.class, cfg -> cfg.storage.save(subscriptionId, (SubscriptionPosition)TimeBasedSubscriptionPosition.from(e.getTime()))));
        if (!this.shuttingDown && this.runningCatchupSubscriptions.containsKey(subscriptionId)) {
            subscriptionsWasCancelledOrShutdown = false;
            this.runningCatchupSubscriptions.remove(subscriptionId);
        } else {
            subscriptionsWasCancelledOrShutdown = true;
        }
        Supplier<StartAt> startAtSupplierToUse = this.returnIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, cfg -> () -> {
            SubscriptionPosition position = cfg.storage.read(subscriptionId);
            if (position == null || CatchupSubscriptionModel.isTimeBasedSubscriptionPosition(position)) {
                position = cfg.storage.save(subscriptionId, globalSubscriptionPosition);
            }
            return StartAt.subscriptionPosition((SubscriptionPosition)position);
        }).orElse(() -> StartAt.subscriptionPosition((SubscriptionPosition)globalSubscriptionPosition));
        if (subscriptionsWasCancelledOrShutdown) {
            this.doIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, cfg -> {
                if (!cfg.storage.exists(subscriptionId)) {
                    startAtSupplierToUse.get();
                }
            });
            subscription = new CancelledSubscription(subscriptionId);
        } else {
            subscription = this.subscriptionModel.subscribe(subscriptionId, filter, startAtSupplierToUse, cloudEvent -> {
                if (!cache.isCached(cloudEvent.getId())) {
                    action.accept((CloudEvent)cloudEvent);
                }
            });
        }
        return subscription;
    }

    public void cancelSubscription(String subscriptionId) {
        this.runningCatchupSubscriptions.remove(subscriptionId);
        if (this.subscriptionModel instanceof SubscriptionModelCancelSubscription) {
            ((SubscriptionModelCancelSubscription)this.subscriptionModel).cancelSubscription(subscriptionId);
        }
        this.doIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, cfg -> cfg.storage.delete(subscriptionId));
    }

    public Subscription subscribe(String subscriptionId, Consumer<CloudEvent> action) {
        return this.subscribe(subscriptionId, null, action);
    }

    public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, Consumer<CloudEvent> action) {
        return this.subscribe(subscriptionId, filter, () -> {
            SubscriptionPosition subscriptionPosition = this.returnIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, cfg -> cfg.storage.read(subscriptionId)).orElse(null);
            return subscriptionPosition == null ? null : StartAt.subscriptionPosition((SubscriptionPosition)subscriptionPosition);
        }, action);
    }

    @PreDestroy
    public void shutdown() {
        this.shuttingDown = true;
        this.runningCatchupSubscriptions.clear();
        this.subscriptionModel.shutdown();
    }

    public static boolean isTimeBasedSubscriptionPosition(StartAt startAt) {
        if (!(startAt instanceof StartAt.StartAtSubscriptionPosition)) {
            return false;
        }
        SubscriptionPosition subscriptionPosition = ((StartAt.StartAtSubscriptionPosition)startAt).subscriptionPosition;
        return CatchupSubscriptionModel.isTimeBasedSubscriptionPosition(subscriptionPosition);
    }

    public static boolean isTimeBasedSubscriptionPosition(SubscriptionPosition subscriptionPosition) {
        return subscriptionPosition instanceof TimeBasedSubscriptionPosition || subscriptionPosition instanceof StringBasedSubscriptionPosition && CatchupSubscriptionModel.isRfc3339Timestamp(subscriptionPosition.asString());
    }

    private static boolean isRfc3339Timestamp(String string) {
        try {
            OffsetDateTime.parse(string, RFC3339.RFC_3339_DATE_TIME_FORMATTER);
            return true;
        }
        catch (Exception exception) {
            return false;
        }
    }

    private static boolean isBeginningOfTime(SubscriptionPosition subscriptionPosition) {
        return subscriptionPosition instanceof TimeBasedSubscriptionPosition && ((TimeBasedSubscriptionPosition)subscriptionPosition).isBeginningOfTime();
    }

    public SubscriptionModel getDelegatedSubscriptionModel() {
        return this.subscriptionModel;
    }

    private <T, C extends SubscriptionPositionStorageConfig> Optional<T> returnIfSubscriptionPositionStorageConfigIs(Class<C> cls, Function<C, T> fn) {
        if (cls.isInstance(this.config.subscriptionStorageConfig)) {
            return Optional.ofNullable(fn.apply(cls.cast(this.config.subscriptionStorageConfig)));
        }
        return Optional.empty();
    }

    private <T, C extends SubscriptionPositionStorageConfig> void doIfSubscriptionPositionStorageConfigIs(Class<C> cls, Consumer<C> consumer) {
        if (cls.isInstance(this.config.subscriptionStorageConfig)) {
            consumer.accept(cls.cast(this.config.subscriptionStorageConfig));
        }
    }

    private static class CancelledSubscription
    implements Subscription {
        private final String subscriptionId;

        public CancelledSubscription(String subscriptionId) {
            this.subscriptionId = subscriptionId;
        }

        public String id() {
            return this.subscriptionId;
        }

        public void waitUntilStarted() {
        }

        public boolean waitUntilStarted(Duration timeout) {
            return true;
        }
    }

    private static class FixedSizeCache {
        private final LinkedHashMap<String, String> cacheContent;

        FixedSizeCache(final int size) {
            this.cacheContent = new LinkedHashMap<String, String>(){

                @Override
                protected boolean removeEldestEntry(Map.Entry<String, String> eldest) {
                    return this.size() > size;
                }
            };
        }

        private void put(String value) {
            this.cacheContent.put(value, null);
        }

        public boolean isCached(String key) {
            return this.cacheContent.containsKey(key);
        }
    }
}

