/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.domain.topic;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.ContentType;
import pl.allegro.tech.hermes.api.MessageTextPreview;
import pl.allegro.tech.hermes.api.OwnerId;
import pl.allegro.tech.hermes.api.PatchData;
import pl.allegro.tech.hermes.api.Query;
import pl.allegro.tech.hermes.api.RawSchema;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicMetrics;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.api.TopicNameWithMetrics;
import pl.allegro.tech.hermes.api.TopicWithSchema;
import pl.allegro.tech.hermes.api.helpers.Patch;
import pl.allegro.tech.hermes.domain.topic.TopicAlreadyExistsException;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;
import pl.allegro.tech.hermes.domain.topic.preview.MessagePreview;
import pl.allegro.tech.hermes.domain.topic.preview.MessagePreviewRepository;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.domain.Auditor;
import pl.allegro.tech.hermes.management.domain.blacklist.TopicBlacklistService;
import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.dc.RepositoryManager;
import pl.allegro.tech.hermes.management.domain.group.GroupService;
import pl.allegro.tech.hermes.management.domain.topic.CreatorRights;
import pl.allegro.tech.hermes.management.domain.topic.TopicContentTypeMigrationService;
import pl.allegro.tech.hermes.management.domain.topic.TopicManipulatorUser;
import pl.allegro.tech.hermes.management.domain.topic.TopicMetricsRepository;
import pl.allegro.tech.hermes.management.domain.topic.TopicOwnerCache;
import pl.allegro.tech.hermes.management.domain.topic.TopicRemovalDisabledException;
import pl.allegro.tech.hermes.management.domain.topic.TopicSchemaExistsException;
import pl.allegro.tech.hermes.management.domain.topic.commands.CreateTopicRepositoryCommand;
import pl.allegro.tech.hermes.management.domain.topic.commands.RemoveTopicRepositoryCommand;
import pl.allegro.tech.hermes.management.domain.topic.commands.TouchTopicRepositoryCommand;
import pl.allegro.tech.hermes.management.domain.topic.commands.UpdateTopicRepositoryCommand;
import pl.allegro.tech.hermes.management.domain.topic.schema.SchemaService;
import pl.allegro.tech.hermes.management.domain.topic.validator.TopicValidator;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;

@Component
public class TopicService {
    private static final Logger logger = LoggerFactory.getLogger(TopicService.class);
    private final TopicRepository topicRepository;
    private final GroupService groupService;
    private final TopicProperties topicProperties;
    private final SchemaService schemaService;
    private final TopicMetricsRepository metricRepository;
    private final MultiDCAwareService multiDCAwareService;
    private final TopicBlacklistService topicBlacklistService;
    private final TopicValidator topicValidator;
    private final TopicContentTypeMigrationService topicContentTypeMigrationService;
    private final Clock clock;
    private final Auditor auditor;
    private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
    private final RepositoryManager repositoryManager;
    private final TopicOwnerCache topicOwnerCache;
    private final ScheduledExecutorService scheduledTopicExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("scheduled-topic-executor-%d").build());

    @Autowired
    public TopicService(MultiDCAwareService multiDCAwareService, TopicRepository topicRepository, GroupService groupService, TopicProperties topicProperties, SchemaService schemaService, TopicMetricsRepository metricRepository, TopicBlacklistService topicBlacklistService, TopicValidator topicValidator, TopicContentTypeMigrationService topicContentTypeMigrationService, Clock clock, Auditor auditor, MultiDatacenterRepositoryCommandExecutor multiDcExecutor, RepositoryManager repositoryManager, TopicOwnerCache topicOwnerCache) {
        this.multiDCAwareService = multiDCAwareService;
        this.topicRepository = topicRepository;
        this.groupService = groupService;
        this.topicProperties = topicProperties;
        this.schemaService = schemaService;
        this.metricRepository = metricRepository;
        this.topicBlacklistService = topicBlacklistService;
        this.topicValidator = topicValidator;
        this.topicContentTypeMigrationService = topicContentTypeMigrationService;
        this.clock = clock;
        this.auditor = auditor;
        this.multiDcExecutor = multiDcExecutor;
        this.repositoryManager = repositoryManager;
        this.topicOwnerCache = topicOwnerCache;
    }

    public void createTopicWithSchema(TopicWithSchema topicWithSchema, TopicManipulatorUser createdBy, CreatorRights isAllowedToManage) {
        Topic topic = topicWithSchema.getTopic();
        this.auditor.beforeObjectCreation(createdBy.getUsername(), topic);
        this.topicValidator.ensureCreatedTopicIsValid(topic, createdBy, isAllowedToManage);
        this.ensureTopicDoesNotExist(topic);
        boolean validateAndRegisterSchema = ContentType.AVRO.equals((Object)topic.getContentType()) || topic.isJsonToAvroDryRunEnabled() && topicWithSchema.getSchema() != null;
        this.validateSchema(validateAndRegisterSchema, topicWithSchema, topic);
        this.registerAvroSchema(validateAndRegisterSchema, topicWithSchema, createdBy.getUsername());
        this.createTopic(topic, createdBy, isAllowedToManage);
    }

    private void ensureTopicDoesNotExist(Topic topic) {
        if (this.topicRepository.topicExists(topic.getName())) {
            throw new TopicAlreadyExistsException(topic.getName());
        }
    }

    private void validateSchema(boolean shouldValidate, TopicWithSchema topicWithSchema, Topic topic) {
        if (shouldValidate) {
            this.schemaService.validateSchema(topic, topicWithSchema.getSchema());
            boolean schemaAlreadyRegistered = this.schemaService.getSchema(topic.getQualifiedName()).isPresent();
            if (schemaAlreadyRegistered) {
                throw new TopicSchemaExistsException(topic.getQualifiedName());
            }
        }
    }

    private void registerAvroSchema(boolean shouldRegister, TopicWithSchema topicWithSchema, String createdBy) {
        if (shouldRegister) {
            try {
                this.schemaService.registerSchema(topicWithSchema.getTopic(), topicWithSchema.getSchema());
            }
            catch (Exception e) {
                logger.error("Rolling back topic {} creation due to schema registration error", (Object)topicWithSchema.getQualifiedName(), (Object)e);
                this.removeTopic(topicWithSchema.getTopic(), createdBy);
                throw e;
            }
        }
    }

    private void createTopic(Topic topic, TopicManipulatorUser createdBy, CreatorRights creatorRights) {
        this.topicValidator.ensureCreatedTopicIsValid(topic, createdBy, creatorRights);
        if (!this.multiDCAwareService.topicExists(topic)) {
            this.createTopicInBrokers(topic);
            this.auditor.objectCreated(createdBy.getUsername(), topic);
            this.topicOwnerCache.onCreatedTopic(topic);
        } else {
            logger.info("Skipping creation of topic {} on brokers, topic already exists", (Object)topic.getQualifiedName());
        }
        this.multiDcExecutor.execute(new CreateTopicRepositoryCommand(topic));
    }

    private void createTopicInBrokers(Topic topic) {
        try {
            this.multiDCAwareService.manageTopic(brokerTopicManagement -> brokerTopicManagement.createTopic(topic));
        }
        catch (Exception exception) {
            logger.error(String.format("Could not create topic %s, rollback topic creation.", topic.getQualifiedName()), (Throwable)exception);
            this.multiDcExecutor.execute(new RemoveTopicRepositoryCommand(topic.getName()));
        }
    }

    public void removeTopicWithSchema(Topic topic, TopicManipulatorUser removedBy) {
        this.auditor.beforeObjectRemoval(removedBy.getUsername(), Topic.class.getSimpleName(), topic.getQualifiedName());
        this.topicRepository.ensureTopicHasNoSubscriptions(topic.getName());
        this.removeSchema(topic);
        if (!this.topicProperties.isAllowRemoval()) {
            throw new TopicRemovalDisabledException(topic);
        }
        if (this.topicBlacklistService.isBlacklisted(topic.getQualifiedName())) {
            this.topicBlacklistService.unblacklist(topic.getQualifiedName());
        }
        this.removeTopic(topic, removedBy.getUsername());
    }

    private void removeSchema(Topic topic) {
        if (ContentType.AVRO.equals((Object)topic.getContentType()) && this.topicProperties.isRemoveSchema()) {
            this.schemaService.getSchema(topic.getQualifiedName()).ifPresent(s -> this.schemaService.deleteAllSchemaVersions(topic.getQualifiedName()));
        }
    }

    private void removeTopic(Topic topic, String removedBy) {
        this.multiDcExecutor.execute(new RemoveTopicRepositoryCommand(topic.getName()));
        this.multiDCAwareService.manageTopic(brokerTopicManagement -> brokerTopicManagement.removeTopic(topic));
        this.auditor.objectRemoved(removedBy, Topic.class.getSimpleName(), topic.getQualifiedName());
        this.topicOwnerCache.onRemovedTopic(topic);
    }

    public void updateTopicWithSchema(TopicName topicName, PatchData patch, TopicManipulatorUser modifiedBy) {
        Topic topic = this.getTopicDetails(topicName);
        this.extractSchema(patch).ifPresent(schema -> {
            this.schemaService.registerSchema(topic, (String)schema);
            this.scheduleTouchTopic(topicName);
        });
        this.updateTopic(topicName, patch, modifiedBy);
    }

    private Optional<String> extractSchema(PatchData patch) {
        return Optional.ofNullable(patch.getPatch().get("schema")).map(o -> (String)o);
    }

    public void updateTopic(TopicName topicName, PatchData patch, TopicManipulatorUser modifiedBy) {
        this.auditor.beforeObjectUpdate(modifiedBy.getUsername(), Topic.class.getSimpleName(), topicName, patch);
        this.groupService.checkGroupExists(topicName.getGroupName());
        Topic retrieved = this.getTopicDetails(topicName);
        Topic modified = (Topic)Patch.apply((Object)retrieved, (PatchData)patch);
        this.topicValidator.ensureUpdatedTopicIsValid(modified, retrieved, modifiedBy);
        if (!retrieved.equals((Object)modified)) {
            Instant beforeMigrationInstant = this.clock.instant();
            if (retrieved.getRetentionTime() != modified.getRetentionTime()) {
                this.multiDCAwareService.manageTopic(brokerTopicManagement -> brokerTopicManagement.updateTopic(modified));
            }
            this.multiDcExecutor.execute(new UpdateTopicRepositoryCommand(modified));
            if (!retrieved.wasMigratedFromJsonType() && modified.wasMigratedFromJsonType()) {
                logger.info("Waiting until all subscriptions have consumers assigned during topic {} content type migration...", (Object)topicName.qualifiedName());
                this.topicContentTypeMigrationService.waitUntilAllSubscriptionsHasConsumersAssigned(modified, Duration.ofSeconds(this.topicProperties.getSubscriptionsAssignmentsCompletedTimeoutSeconds()));
                logger.info("Notifying subscriptions' consumers about changes in topic {} content type...", (Object)topicName.qualifiedName());
                this.topicContentTypeMigrationService.notifySubscriptions(modified, beforeMigrationInstant);
            }
            this.auditor.objectUpdated(modifiedBy.getUsername(), retrieved, modified);
            this.topicOwnerCache.onUpdatedTopic(retrieved, modified);
        }
    }

    public void touchTopic(TopicName topicName) {
        logger.info("Touching topic {}", (Object)topicName.qualifiedName());
        this.multiDcExecutor.execute(new TouchTopicRepositoryCommand(topicName));
    }

    public void scheduleTouchTopic(TopicName topicName) {
        if (this.topicProperties.isTouchSchedulerEnabled()) {
            logger.info("Scheduling touch of topic {}", (Object)topicName.qualifiedName());
            this.scheduledTopicExecutor.schedule(() -> this.touchTopic(topicName), (long)this.topicProperties.getTouchDelayInSeconds(), TimeUnit.SECONDS);
        } else {
            this.touchTopic(topicName);
        }
    }

    public List<String> listQualifiedTopicNames(String groupName) {
        return this.topicRepository.listTopicNames(groupName).stream().map(topicName -> new TopicName(groupName, topicName).qualifiedName()).collect(Collectors.toList());
    }

    public List<Topic> listTopics(String groupName) {
        return this.topicRepository.listTopics(groupName);
    }

    public List<String> listQualifiedTopicNames() {
        return this.groupService.listGroupNames().stream().map(this::listQualifiedTopicNames).flatMap(Collection::stream).sorted().collect(Collectors.toList());
    }

    public Topic getTopicDetails(TopicName topicName) {
        return this.topicRepository.getTopicDetails(topicName);
    }

    public TopicWithSchema getTopicWithSchema(TopicName topicName) {
        Topic topic = this.getTopicDetails(topicName);
        Optional<RawSchema> schema = Optional.empty();
        if (ContentType.AVRO.equals((Object)topic.getContentType())) {
            schema = this.schemaService.getSchema(topicName.qualifiedName());
        }
        return schema.map(s -> TopicWithSchema.topicWithSchema((Topic)topic, (String)s.value())).orElseGet(() -> TopicWithSchema.topicWithSchema((Topic)topic));
    }

    public TopicMetrics getTopicMetrics(TopicName topicName) {
        return this.topicRepository.topicExists(topicName) ? this.metricRepository.loadMetrics(topicName) : TopicMetrics.unavailable();
    }

    public String fetchSingleMessageFromPrimary(String brokersClusterName, TopicName topicName, Integer partition, Long offset) {
        return this.multiDCAwareService.readMessageFromPrimary(brokersClusterName, this.getTopicDetails(topicName), partition, offset);
    }

    public List<String> listTrackedTopicNames() {
        return this.groupService.listGroupNames().stream().map(arg_0 -> ((TopicRepository)this.topicRepository).listTopics(arg_0)).flatMap(Collection::stream).filter(Topic::isTrackingEnabled).map(Topic::getQualifiedName).collect(Collectors.toList());
    }

    public List<String> listTrackedTopicNames(String groupName) {
        return this.listTopics(groupName).stream().filter(Topic::isTrackingEnabled).map(Topic::getQualifiedName).collect(Collectors.toList());
    }

    public List<String> listFilteredTopicNames(Query<Topic> query) {
        return this.queryTopic(query).stream().map(Topic::getQualifiedName).collect(Collectors.toList());
    }

    public List<String> listFilteredTopicNames(String groupName, Query<Topic> query) {
        return query.filter(this.listTopics(groupName)).map(Topic::getQualifiedName).collect(Collectors.toList());
    }

    public List<Topic> queryTopic(Query<Topic> query) {
        return query.filter(this.getAllTopics()).collect(Collectors.toList());
    }

    public List<Topic> getAllTopics() {
        return this.groupService.listGroupNames().stream().map(arg_0 -> ((TopicRepository)this.topicRepository).listTopics(arg_0)).flatMap(Collection::stream).collect(Collectors.toList());
    }

    public Optional<byte[]> preview(TopicName topicName, int idx) {
        List result = this.loadMessagePreviewsFromAllDc(topicName).stream().map(MessagePreview::getContent).collect(Collectors.toList());
        if (idx >= 0 && idx < result.size()) {
            return Optional.of(result.get(idx));
        }
        return Optional.empty();
    }

    public List<MessageTextPreview> previewText(TopicName topicName) {
        return this.loadMessagePreviewsFromAllDc(topicName).stream().map(p -> new MessageTextPreview(new String(p.getContent(), StandardCharsets.UTF_8), p.isTruncated())).collect(Collectors.toList());
    }

    private List<MessagePreview> loadMessagePreviewsFromAllDc(TopicName topicName) {
        List<DatacenterBoundRepositoryHolder<MessagePreviewRepository>> repositories = this.repositoryManager.getRepositories(MessagePreviewRepository.class);
        ArrayList<MessagePreview> previews = new ArrayList<MessagePreview>();
        for (DatacenterBoundRepositoryHolder<MessagePreviewRepository> holder : repositories) {
            try {
                previews.addAll(holder.getRepository().loadPreview(topicName));
            }
            catch (Exception e) {
                logger.warn("Could not load message preview for DC: {}", (Object)holder.getDatacenterName());
            }
        }
        return previews;
    }

    public List<TopicNameWithMetrics> queryTopicsMetrics(Query<TopicNameWithMetrics> query) {
        List<Topic> filteredNames = query.filterNames(this.getAllTopics()).collect(Collectors.toList());
        return query.filter(this.getTopicsMetrics(filteredNames)).collect(Collectors.toList());
    }

    private List<TopicNameWithMetrics> getTopicsMetrics(List<Topic> topics) {
        return topics.stream().map(t -> {
            TopicMetrics metrics = this.metricRepository.loadMetrics(t.getName());
            return TopicNameWithMetrics.from((TopicMetrics)metrics, (String)t.getQualifiedName());
        }).collect(Collectors.toList());
    }

    public List<Topic> listForOwnerId(OwnerId ownerId) {
        Collection<TopicName> topicNames = this.topicOwnerCache.get(ownerId);
        return this.topicRepository.getTopicsDetails(topicNames);
    }
}

