/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.domain.topic;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.management.domain.topic.OffsetsNotAvailableException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;

@Component
public class TopicContentTypeMigrationService {
    private static final Logger logger = LoggerFactory.getLogger(TopicContentTypeMigrationService.class);
    public static final Duration CHECK_OFFSETS_AVAILABLE_TIMEOUT = Duration.ofSeconds(1L);
    public static final Duration INTERVAL_BETWEEN_OFFSETS_AVAILABLE_CHECK_RETRIES = Duration.ofMillis(500L);
    private final SubscriptionRepository subscriptionRepository;
    private final MultiDCAwareService multiDCAwareService;
    private final Clock clock;

    @Autowired
    public TopicContentTypeMigrationService(SubscriptionRepository subscriptionRepository, MultiDCAwareService multiDCAwareService, Clock clock) {
        this.subscriptionRepository = subscriptionRepository;
        this.multiDCAwareService = multiDCAwareService;
        this.clock = clock;
    }

    public void notifySubscriptions(Topic topic, Instant beforeMigrationInstant) {
        this.waitUntilOffsetsAvailableOnAllKafkaTopics(topic, CHECK_OFFSETS_AVAILABLE_TIMEOUT);
        this.subscriptionRepository.listSubscriptionNames(topic.getName()).forEach(s -> this.notifySingleSubscription(topic, beforeMigrationInstant, (String)s));
    }

    private void notifySingleSubscription(Topic topic, Instant beforeMigrationInstant, String subscriptionName) {
        this.multiDCAwareService.moveOffset(topic, subscriptionName, beforeMigrationInstant.toEpochMilli(), false);
    }

    private void waitUntilOffsetsAvailableOnAllKafkaTopics(Topic topic, Duration offsetsAvailableTimeout) {
        Instant abortAttemptsInstant = this.clock.instant().plus(offsetsAvailableTimeout);
        while (!this.multiDCAwareService.areOffsetsAvailableOnAllKafkaTopics(topic)) {
            if (this.clock.instant().isAfter(abortAttemptsInstant)) {
                throw new OffsetsNotAvailableException(topic);
            }
            logger.debug("Not all offsets related to hermes topic {} are available, will retry", (Object)topic.getQualifiedName());
            this.sleep(INTERVAL_BETWEEN_OFFSETS_AVAILABLE_CHECK_RETRIES);
        }
    }

    private void sleep(Duration sleepDuration) {
        try {
            Thread.sleep(sleepDuration.toMillis());
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

