/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.infrastructure.kafka;

import java.util.List;
import java.util.function.Consumer;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.admin.AdminTool;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.BrokersClusterNotFoundException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;

public class MultiDCAwareService {
    private final List<BrokersClusterService> clusters;
    private final AdminTool adminTool;

    public MultiDCAwareService(List<BrokersClusterService> clusters, AdminTool adminTool) {
        this.clusters = clusters;
        this.adminTool = adminTool;
    }

    public void manageTopic(Consumer<BrokerTopicManagement> manageFunction) {
        this.clusters.forEach(kafkaService -> kafkaService.manageTopic(manageFunction));
    }

    public String readMessage(String clusterName, TopicName topicName, Integer partition, Long offset) {
        return this.clusters.stream().filter(cluster -> clusterName.equals(cluster.getClusterName())).findFirst().orElseThrow(() -> new BrokersClusterNotFoundException(clusterName)).readMessage(topicName, partition, offset);
    }

    public void moveOffset(TopicName topicName, String subscriptionName, Long timestamp) {
        this.clusters.forEach(cluster -> cluster.indicateOffsetChange(topicName, subscriptionName, timestamp));
        this.adminTool.retransmit(new SubscriptionName(subscriptionName, topicName));
    }
}

