/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.endpoint.receiver.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ChannelResponse;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.ShardResponse;
import de.otto.synapse.consumer.MessageDispatcher;
import de.otto.synapse.endpoint.EndpointType;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.receiver.kafka.ChannelDurationBehindHandler;
import de.otto.synapse.endpoint.receiver.kafka.KafkaDecoder;
import de.otto.synapse.message.TextMessage;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaRecordsConsumer
implements Function<ConsumerRecords<String, String>, ChannelResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordsConsumer.class);
    public static final Duration UNKNOWN_DURATION_BEHIND = Duration.ofMillis(Long.MAX_VALUE);
    private final String channelName;
    private final MessageInterceptorRegistry interceptorRegistry;
    private final MessageDispatcher messageDispatcher;
    private final Supplier<Set<String>> currentShardsSupplier;
    private final ChannelDurationBehindHandler durationBehindHandler;
    private final KafkaDecoder decoder;
    private ChannelPosition currentChannelPosition;

    KafkaRecordsConsumer(String channelName, ChannelPosition startFrom, MessageInterceptorRegistry interceptorRegistry, MessageDispatcher messageDispatcher, ChannelDurationBehindHandler durationBehindHandler, Supplier<Set<String>> currentShardsSupplier, KafkaDecoder decoder) {
        this.channelName = channelName;
        this.currentChannelPosition = startFrom;
        this.interceptorRegistry = interceptorRegistry;
        this.messageDispatcher = messageDispatcher;
        this.currentShardsSupplier = currentShardsSupplier;
        this.durationBehindHandler = durationBehindHandler;
        this.decoder = decoder;
    }

    @Override
    public ChannelResponse apply(ConsumerRecords<String, String> records) {
        HashMap receivedMessagesPerShard = Maps.newHashMap();
        HashMap shardPositionsFromRecords = Maps.newHashMap();
        records.forEach(record -> {
            try {
                String shardName = "" + record.partition();
                TextMessage message = this.decoder.apply((ConsumerRecord<String, String>)record);
                LOG.debug("Processing message " + message.getKey());
                TextMessage interceptedMessage = this.interceptorRegistry.getInterceptorChain(this.channelName, EndpointType.RECEIVER).intercept(message);
                shardPositionsFromRecords.put(shardName, this.toShardPosition((ConsumerRecord<String, String>)record));
                if (interceptedMessage != null) {
                    this.messageDispatcher.accept(interceptedMessage);
                    receivedMessagesPerShard.compute(shardName, (key, messages) -> {
                        if (messages != null) {
                            return messages.add((Object)interceptedMessage);
                        }
                        return ImmutableList.builder().add((Object)interceptedMessage);
                    });
                } else {
                    LOG.debug("Message {} dropped by interceptor", (Object)message.getKey());
                }
            }
            catch (Exception e) {
                LOG.error("Error processing message: " + e.getMessage(), (Throwable)e);
            }
        });
        ImmutableMap<String, Duration> channelDurationBehind = this.updateAndGetDurationBehind(records);
        this.updateCurrentChannelPosition(shardPositionsFromRecords.values());
        return ChannelResponse.channelResponse((String)this.channelName, (ImmutableList)((ImmutableList)this.currentShardPositions().stream().map(shardPosition -> {
            String shardName = shardPosition.shardName();
            ImmutableList messages = receivedMessagesPerShard.getOrDefault(shardName, ImmutableList.builder()).build();
            Duration durationBehind = (Duration)channelDurationBehind.getOrDefault((Object)shardName, (Object)UNKNOWN_DURATION_BEHIND);
            return ShardResponse.shardResponse((ShardPosition)shardPosition, (Duration)durationBehind, (ImmutableList)messages);
        }).collect(ImmutableList.toImmutableList())));
    }

    private ShardPosition toShardPosition(ConsumerRecord<String, String> record) {
        return ShardPosition.fromPosition((String)("" + record.partition()), (String)this.followingShardPosition("" + record.offset()));
    }

    private ImmutableList<ShardPosition> currentShardPositions() {
        return (ImmutableList)this.currentShardsSupplier.get().stream().map(shardName -> this.currentChannelPosition.shard(shardName)).collect(ImmutableList.toImmutableList());
    }

    private void updateCurrentChannelPosition(Collection<ShardPosition> shardPositionsFromRecords) {
        shardPositionsFromRecords.forEach(shardPos -> {
            this.currentChannelPosition = ChannelPosition.merge((ChannelPosition)this.currentChannelPosition, (ShardPosition)shardPos);
        });
    }

    private ImmutableMap<String, Duration> updateAndGetDurationBehind(ConsumerRecords<String, String> records) {
        for (TopicPartition topicPartition : records.partitions()) {
            ConsumerRecord lastRecord = (ConsumerRecord)Iterables.getLast((Iterable)records.records(topicPartition));
            Instant lastTimestampRead = Instant.ofEpochMilli(lastRecord.timestamp());
            this.durationBehindHandler.update(topicPartition, lastRecord.offset(), lastTimestampRead);
        }
        if (records.isEmpty()) {
            this.durationBehindHandler.noRecordsReceived();
        }
        return this.durationBehindHandler.getChannelDurationBehind().getShardDurationsBehind();
    }

    private String followingShardPosition(String currentPosition) {
        return "" + Integer.parseInt(currentPosition);
    }
}

