/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.broker.BrokerStorage;
import pl.allegro.tech.hermes.common.kafka.KafkaConsumerPool;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.management.domain.message.RetransmissionService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.retransmit.OffsetNotFoundException;

public class KafkaRetransmissionService
implements RetransmissionService {
    private final BrokerStorage brokerStorage;
    private final SubscriptionOffsetChangeIndicator subscriptionOffsetChange;
    private final KafkaConsumerPool consumerPool;
    private final KafkaNamesMapper kafkaNamesMapper;

    public KafkaRetransmissionService(BrokerStorage brokerStorage, SubscriptionOffsetChangeIndicator subscriptionOffsetChange, KafkaConsumerPool consumerPool, KafkaNamesMapper kafkaNamesMapper) {
        this.brokerStorage = brokerStorage;
        this.subscriptionOffsetChange = subscriptionOffsetChange;
        this.consumerPool = consumerPool;
        this.kafkaNamesMapper = kafkaNamesMapper;
    }

    @Override
    public void indicateOffsetChange(Topic topic, String subscription, String brokersClusterName, List<PartitionOffset> partitionOffsets) {
        for (PartitionOffset partitionOffset : partitionOffsets) {
            this.subscriptionOffsetChange.setSubscriptionOffset(topic.getName(), subscription, brokersClusterName, partitionOffset);
        }
    }

    @Override
    public boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName) {
        return this.kafkaNamesMapper.toKafkaTopics(topic).allMatch(kafkaTopic -> {
            List partitionIds = this.brokerStorage.readPartitionsIds(kafkaTopic.name().asString());
            return this.subscriptionOffsetChange.areOffsetsMoved(topic.getName(), subscriptionName, brokersClusterName, kafkaTopic, partitionIds);
        });
    }

    private KafkaConsumer<byte[], byte[]> createKafkaConsumer(KafkaTopic kafkaTopic, int partition) {
        return this.consumerPool.get(kafkaTopic, partition);
    }

    @Override
    public List<PartitionOffset> fetchTopicEndOffsets(Topic topic) {
        return this.fetchTopicOffsetsAt(topic, null);
    }

    @Override
    public List<PartitionOffset> fetchTopicOffsetsAt(Topic topic, Long timestamp) {
        ArrayList<PartitionOffset> partitionOffsetList = new ArrayList<PartitionOffset>();
        this.kafkaNamesMapper.toKafkaTopics(topic).forEach(k -> {
            List partitionsIds = this.brokerStorage.readPartitionsIds(k.name().asString());
            for (Integer partitionId : partitionsIds) {
                KafkaConsumer<byte[], byte[]> consumer = this.createKafkaConsumer((KafkaTopic)k, partitionId);
                long offset = this.getOffsetForTimestampOrEnd(timestamp, (KafkaTopic)k, partitionId, consumer);
                PartitionOffset partitionOffset = new PartitionOffset(k.name(), offset, partitionId.intValue());
                partitionOffsetList.add(partitionOffset);
            }
        });
        return partitionOffsetList;
    }

    private long getOffsetForTimestampOrEnd(Long timestamp, KafkaTopic kafkaTopic, Integer partitionId, KafkaConsumer<byte[], byte[]> consumer) {
        long endOffset = this.getEndingOffset(consumer, kafkaTopic, partitionId);
        return Optional.ofNullable(timestamp).flatMap(ts -> this.findClosestOffsetJustBeforeTimestamp(consumer, kafkaTopic, partitionId, (long)ts)).orElse(endOffset);
    }

    private Optional<Long> findClosestOffsetJustBeforeTimestamp(KafkaConsumer<byte[], byte[]> consumer, KafkaTopic kafkaTopic, int partition, long timestamp) {
        TopicPartition topicPartition = new TopicPartition(kafkaTopic.name().asString(), partition);
        return Optional.ofNullable((OffsetAndTimestamp)consumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp)).get(topicPartition)).map(OffsetAndTimestamp::offset);
    }

    private long getEndingOffset(KafkaConsumer<byte[], byte[]> kafkaConsumer, KafkaTopic topicName, int partition) {
        TopicPartition topicPartition = new TopicPartition(topicName.name().asString(), partition);
        Map offsets = kafkaConsumer.endOffsets(Collections.singleton(topicPartition));
        return Optional.ofNullable((Long)offsets.get(topicPartition)).orElseThrow(() -> new OffsetNotFoundException(String.format("Ending offset for partition %s not found", topicPartition)));
    }
}

