/*
 * Decompiled with CFR 0.152.
 */
package org.occurrent.subscription.blocking.durable.catchup;

import io.cloudevents.CloudEvent;
import jakarta.annotation.Nullable;
import jakarta.annotation.PreDestroy;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.occurrent.condition.Condition;
import org.occurrent.eventstore.api.blocking.EventStoreQueries;
import org.occurrent.filter.Filter;
import org.occurrent.subscription.OccurrentSubscriptionFilter;
import org.occurrent.subscription.StartAt;
import org.occurrent.subscription.StringBasedSubscriptionPosition;
import org.occurrent.subscription.SubscriptionFilter;
import org.occurrent.subscription.SubscriptionPosition;
import org.occurrent.subscription.api.blocking.DelegatingSubscriptionModel;
import org.occurrent.subscription.api.blocking.PositionAwareSubscriptionModel;
import org.occurrent.subscription.api.blocking.Subscription;
import org.occurrent.subscription.api.blocking.SubscriptionModel;
import org.occurrent.subscription.blocking.durable.catchup.CatchupSubscription;
import org.occurrent.subscription.blocking.durable.catchup.CatchupSubscriptionModelConfig;
import org.occurrent.subscription.blocking.durable.catchup.StartAtTime;
import org.occurrent.subscription.blocking.durable.catchup.SubscriptionPositionStorageConfig;
import org.occurrent.subscription.blocking.durable.catchup.TimeBasedSubscriptionPosition;
import org.occurrent.time.internal.RFC3339;

public class CatchupSubscriptionModel
implements SubscriptionModel,
DelegatingSubscriptionModel {
    private static final int DEFAULT_CACHE_SIZE = 100;
    private final PositionAwareSubscriptionModel subscriptionModel;
    private final EventStoreQueries eventStoreQueries;
    private final CatchupSubscriptionModelConfig config;
    private final ConcurrentMap<String, Boolean> runningCatchupSubscriptions = new ConcurrentHashMap<String, Boolean>();
    private volatile boolean shuttingDown = false;

    public CatchupSubscriptionModel(PositionAwareSubscriptionModel subscriptionModel, EventStoreQueries eventStoreQueries) {
        this(subscriptionModel, eventStoreQueries, new CatchupSubscriptionModelConfig(100));
    }

    public CatchupSubscriptionModel(PositionAwareSubscriptionModel subscriptionModel, EventStoreQueries eventStoreQueries, CatchupSubscriptionModelConfig config) {
        this.subscriptionModel = subscriptionModel;
        this.eventStoreQueries = eventStoreQueries;
        this.config = config;
    }

    public Subscription subscribeFromBeginningOfTime(String subscriptionId, SubscriptionFilter filter, Consumer<CloudEvent> action) {
        return this.subscribe(subscriptionId, filter, StartAtTime.beginningOfTime(), action);
    }

    public Subscription subscribeFromBeginningOfTime(String subscriptionId, Consumer<CloudEvent> action) {
        return this.subscribe(subscriptionId, StartAtTime.beginningOfTime(), action);
    }

    public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, StartAt startAt, Consumer<CloudEvent> action) {
        StartAt firstStartAt;
        Objects.requireNonNull(startAt, "Start at supplier cannot be null");
        if (filter != null && !(filter instanceof OccurrentSubscriptionFilter)) {
            throw new IllegalArgumentException("Unsupported!");
        }
        if (startAt.isDefault()) {
            SubscriptionPosition subscriptionPosition = this.returnIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, cfg -> cfg.storage().read(subscriptionId)).orElse(null);
            if (subscriptionPosition == null) {
                return this.getDelegatedSubscriptionModel().subscribe(subscriptionId, filter, startAt, action);
            }
            firstStartAt = StartAt.subscriptionPosition((SubscriptionPosition)subscriptionPosition);
        } else if (startAt.isDynamic()) {
            StartAt startAtGeneratedByDynamic = startAt.get(CatchupSubscriptionModel.generateSubscriptionModelContext());
            if (startAtGeneratedByDynamic == null) {
                return this.getDelegatedSubscriptionModel().subscribe(subscriptionId, filter, startAt, action);
            }
            firstStartAt = startAtGeneratedByDynamic;
        } else {
            firstStartAt = startAt;
        }
        if (!CatchupSubscriptionModel.isTimeBasedSubscriptionPosition(firstStartAt)) {
            return this.subscriptionModel.subscribe(subscriptionId, filter, firstStartAt, action);
        }
        CompletableFuture<Subscription> subscriptionCompletableFuture = CompletableFuture.supplyAsync(() -> this.startCatchupSubscription(subscriptionId, filter, startAt, action, firstStartAt));
        return new CatchupSubscription(subscriptionId, subscriptionCompletableFuture);
    }

    private Subscription startCatchupSubscription(String subscriptionId, SubscriptionFilter filter, StartAt startAt, Consumer<CloudEvent> action, StartAt firstStartAt) {
        boolean subscriptionsWasCancelledOrShutdown;
        this.runningCatchupSubscriptions.put(subscriptionId, true);
        SubscriptionPosition subscriptionPosition = ((StartAt.StartAtSubscriptionPosition)firstStartAt.get((StartAt.SubscriptionModelContext)CatchupSubscriptionModel.generateSubscriptionModelContext())).subscriptionPosition;
        Filter catchupFilter = CatchupSubscriptionModel.deriveFilterToUseDuringCatchupPhase(filter, subscriptionPosition);
        long numberOfEventsBeforeStartingCatchupSubscription = this.eventStoreQueries.count(catchupFilter);
        this.runCatchupForStream(this.eventStoreQueries.query(catchupFilter, this.config.catchupPhaseSortBy), subscriptionId, action, null);
        Class delegatedSubscriptionModelType = this.getDelegatedSubscriptionModel().getClass();
        StartAt delegatedStartAt = startAt.get(new StartAt.SubscriptionModelContext(delegatedSubscriptionModelType));
        SubscriptionPosition globalSubscriptionPosition = delegatedStartAt == null ? null : this.subscriptionModel.globalSubscriptionPosition();
        long numberOfEventsAfterCatchupSubscriptionCompleted = this.eventStoreQueries.count(catchupFilter);
        long numberOfEventsNotConsumed = numberOfEventsAfterCatchupSubscriptionCompleted - numberOfEventsBeforeStartingCatchupSubscription;
        FixedSizeCache catchupPhaseCache = new FixedSizeCache(this.config.cacheSize);
        if (numberOfEventsNotConsumed > 0L) {
            Stream cloudEvents = this.eventStoreQueries.query(catchupFilter, Math.toIntExact(numberOfEventsBeforeStartingCatchupSubscription), Math.toIntExact(numberOfEventsNotConsumed), this.config.catchupPhaseSortBy);
            this.runCatchupForStream(cloudEvents, subscriptionId, action, catchupPhaseCache);
        }
        if (delegatedStartAt == null) {
            this.returnIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, cfg -> {
                cfg.storage().delete(subscriptionId);
                return null;
            });
        }
        if (!this.shuttingDown && this.runningCatchupSubscriptions.containsKey(subscriptionId)) {
            subscriptionsWasCancelledOrShutdown = false;
            this.runningCatchupSubscriptions.remove(subscriptionId);
        } else {
            subscriptionsWasCancelledOrShutdown = true;
        }
        StartAt startAtToUse = StartAt.dynamic(this.returnIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, cfg -> () -> {
            SubscriptionPosition position = cfg.storage().read(subscriptionId);
            if ((position == null || CatchupSubscriptionModel.isTimeBasedSubscriptionPosition(position)) && globalSubscriptionPosition != null) {
                position = cfg.storage().save(subscriptionId, globalSubscriptionPosition);
            } else if (position == null) {
                return delegatedStartAt == null ? startAt : StartAt.subscriptionModelDefault();
            }
            return StartAt.subscriptionPosition((SubscriptionPosition)position);
        }).orElse(() -> {
            if (globalSubscriptionPosition == null) {
                return delegatedStartAt == null ? startAt : StartAt.subscriptionModelDefault();
            }
            return StartAt.subscriptionPosition((SubscriptionPosition)globalSubscriptionPosition);
        }));
        return this.startDelegatedSubscription(subscriptionId, filter, action, subscriptionsWasCancelledOrShutdown, startAtToUse, catchupPhaseCache);
    }

    private Subscription startDelegatedSubscription(String subscriptionId, SubscriptionFilter filter, Consumer<CloudEvent> action, boolean subscriptionsWasCancelledOrShutdown, StartAt startAtToUse, FixedSizeCache catchupPhaseEventCache) {
        Subscription subscription;
        if (subscriptionsWasCancelledOrShutdown) {
            this.doIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, cfg -> {
                if (!cfg.storage().exists(subscriptionId)) {
                    startAtToUse.get(CatchupSubscriptionModel.generateSubscriptionModelContext());
                }
            });
            subscription = new CancelledSubscription(subscriptionId);
        } else {
            subscription = this.getDelegatedSubscriptionModel().subscribe(subscriptionId, filter, startAtToUse, cloudEvent -> {
                if (!catchupPhaseEventCache.isCached(cloudEvent.getId())) {
                    action.accept((CloudEvent)cloudEvent);
                }
            });
        }
        return subscription;
    }

    private static Filter deriveFilterToUseDuringCatchupPhase(SubscriptionFilter filter, SubscriptionPosition subscriptionPosition) {
        Filter catchupFilter;
        Filter timeFilter;
        if (CatchupSubscriptionModel.isBeginningOfTime(subscriptionPosition)) {
            timeFilter = Filter.all();
        } else {
            OffsetDateTime offsetDateTime = OffsetDateTime.parse(subscriptionPosition.asString(), RFC3339.RFC_3339_DATE_TIME_FORMATTER);
            timeFilter = Filter.time((Condition)Condition.gt((Object)offsetDateTime));
        }
        if (filter == null) {
            catchupFilter = timeFilter;
        } else {
            Filter userSuppliedFilter = ((OccurrentSubscriptionFilter)filter).filter();
            catchupFilter = timeFilter.and(userSuppliedFilter, new Filter[0]);
        }
        return catchupFilter;
    }

    private void runCatchupForStream(Stream<CloudEvent> cloudEvents, String subscriptionId, Consumer<CloudEvent> action, @Nullable FixedSizeCache cache) {
        Stream<CloudEvent> takeWhile = cloudEvents.takeWhile(__ -> !this.shuttingDown && this.runningCatchupSubscriptions.containsKey(subscriptionId));
        if (cache != null) {
            takeWhile = takeWhile.peek(e -> cache.put(e.getId()));
        }
        takeWhile.peek(action).filter(this.returnIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.PersistSubscriptionPositionDuringCatchupPhase.class, SubscriptionPositionStorageConfig.PersistSubscriptionPositionDuringCatchupPhase::persistCloudEventPositionPredicate).orElse(__ -> false)).forEach(e -> this.doIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.PersistSubscriptionPositionDuringCatchupPhase.class, cfg -> cfg.storage().save(subscriptionId, (SubscriptionPosition)TimeBasedSubscriptionPosition.from(e.getTime()))));
    }

    private static StartAt.SubscriptionModelContext generateSubscriptionModelContext() {
        return new StartAt.SubscriptionModelContext(CatchupSubscriptionModel.class);
    }

    public void stop() {
        this.getDelegatedSubscriptionModel().stop();
    }

    public void start(boolean resumeSubscriptionsAutomatically) {
        this.getDelegatedSubscriptionModel().start(resumeSubscriptionsAutomatically);
    }

    public boolean isRunning() {
        return this.getDelegatedSubscriptionModel().isRunning();
    }

    public boolean isRunning(String subscriptionId) {
        return this.getDelegatedSubscriptionModel().isRunning(subscriptionId);
    }

    public boolean isPaused(String subscriptionId) {
        return this.getDelegatedSubscriptionModel().isPaused(subscriptionId);
    }

    public Subscription resumeSubscription(String subscriptionId) {
        return this.getDelegatedSubscriptionModel().resumeSubscription(subscriptionId);
    }

    public void pauseSubscription(String subscriptionId) {
        this.getDelegatedSubscriptionModel().pauseSubscription(subscriptionId);
    }

    public void cancelSubscription(String subscriptionId) {
        this.runningCatchupSubscriptions.remove(subscriptionId);
        this.subscriptionModel.cancelSubscription(subscriptionId);
        this.doIfSubscriptionPositionStorageConfigIs(SubscriptionPositionStorageConfig.UseSubscriptionPositionInStorage.class, cfg -> cfg.storage().delete(subscriptionId));
    }

    @PreDestroy
    public void shutdown() {
        this.shuttingDown = true;
        this.runningCatchupSubscriptions.clear();
        this.subscriptionModel.shutdown();
    }

    public static boolean isTimeBasedSubscriptionPosition(StartAt startAt) {
        StartAt start = startAt.get(CatchupSubscriptionModel.generateSubscriptionModelContext());
        if (!(start instanceof StartAt.StartAtSubscriptionPosition)) {
            return false;
        }
        SubscriptionPosition subscriptionPosition = ((StartAt.StartAtSubscriptionPosition)start).subscriptionPosition;
        return CatchupSubscriptionModel.isTimeBasedSubscriptionPosition(subscriptionPosition);
    }

    public static boolean isTimeBasedSubscriptionPosition(SubscriptionPosition subscriptionPosition) {
        return subscriptionPosition instanceof TimeBasedSubscriptionPosition || subscriptionPosition instanceof StringBasedSubscriptionPosition && CatchupSubscriptionModel.isRfc3339Timestamp(subscriptionPosition.asString());
    }

    private static boolean isRfc3339Timestamp(String string) {
        try {
            OffsetDateTime.parse(string, RFC3339.RFC_3339_DATE_TIME_FORMATTER);
            return true;
        }
        catch (Exception exception) {
            return false;
        }
    }

    private static boolean isBeginningOfTime(SubscriptionPosition subscriptionPosition) {
        return subscriptionPosition instanceof TimeBasedSubscriptionPosition && ((TimeBasedSubscriptionPosition)subscriptionPosition).isBeginningOfTime();
    }

    public SubscriptionModel getDelegatedSubscriptionModel() {
        return this.subscriptionModel;
    }

    private <T, C extends SubscriptionPositionStorageConfig> Optional<T> returnIfSubscriptionPositionStorageConfigIs(Class<C> cls, Function<C, T> fn) {
        if (cls.isInstance(this.config.subscriptionStorageConfig)) {
            return Optional.ofNullable(fn.apply((SubscriptionPositionStorageConfig)cls.cast(this.config.subscriptionStorageConfig)));
        }
        return Optional.empty();
    }

    private <C extends SubscriptionPositionStorageConfig> void doIfSubscriptionPositionStorageConfigIs(Class<C> cls, Consumer<C> consumer) {
        if (cls.isInstance(this.config.subscriptionStorageConfig)) {
            consumer.accept((SubscriptionPositionStorageConfig)cls.cast(this.config.subscriptionStorageConfig));
        }
    }

    public String toString() {
        return new StringJoiner(", ", CatchupSubscriptionModel.class.getSimpleName() + "[", "]").add("subscriptionModel=" + String.valueOf(this.subscriptionModel)).add("eventStoreQueries=" + String.valueOf(this.eventStoreQueries)).add("config=" + String.valueOf(this.config)).add("runningCatchupSubscriptions=" + String.valueOf(this.runningCatchupSubscriptions)).add("shuttingDown=" + this.shuttingDown).toString();
    }

    private static class FixedSizeCache {
        private final LinkedHashMap<String, String> cacheContent;

        FixedSizeCache(final int size) {
            this.cacheContent = new LinkedHashMap<String, String>(){

                @Override
                protected boolean removeEldestEntry(Map.Entry<String, String> eldest) {
                    return this.size() > size;
                }
            };
        }

        private void put(String value) {
            this.cacheContent.put(value, null);
        }

        public boolean isCached(String key) {
            return this.cacheContent.containsKey(key);
        }
    }

    private record CancelledSubscription(String subscriptionId) implements Subscription
    {
        public String id() {
            return this.subscriptionId;
        }

        public boolean waitUntilStarted(Duration timeout) {
            return true;
        }
    }
}

