/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.receiver.kafka;

import com.codahale.metrics.Timer;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.common.message.wrapper.MessageContentWrapper;
import pl.allegro.tech.hermes.common.message.wrapper.UnwrappedMessageContent;
import pl.allegro.tech.hermes.common.time.Clock;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceivingTimeoutException;

public class KafkaMessageReceiver
implements MessageReceiver {
    private final ConsumerIterator<byte[], byte[]> iterator;
    private final ConsumerConnector consumerConnector;
    private final MessageContentWrapper contentWrapper;
    private final Timer readingTimer;
    private final Clock clock;

    public KafkaMessageReceiver(String topicName, ConsumerConnector consumerConnector, ConfigFactory configFactory, MessageContentWrapper contentWrapper, Timer readingTimer, Clock clock) {
        this.consumerConnector = consumerConnector;
        this.contentWrapper = contentWrapper;
        this.readingTimer = readingTimer;
        this.clock = clock;
        ImmutableMap topicCountMap = ImmutableMap.of((Object)topicName, (Object)configFactory.getIntProperty(Configs.KAFKA_STREAM_COUNT));
        Map consumerMap = consumerConnector.createMessageStreams((Map)topicCountMap);
        KafkaStream stream = (KafkaStream)((List)consumerMap.get(topicName)).get(0);
        this.iterator = stream.iterator();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Message next() {
        try (Timer.Context readingTimerContext = this.readingTimer.time();){
            MessageAndMetadata message = this.iterator.next();
            UnwrappedMessageContent unwrappedContent = this.contentWrapper.unwrapContent((byte[])message.message());
            Message message2 = new Message(unwrappedContent.getMessageMetadata().getId(), message.offset(), message.partition(), message.topic(), unwrappedContent.getContent(), unwrappedContent.getMessageMetadata().getTimestamp(), this.clock.getTime());
            return message2;
        }
        catch (ConsumerTimeoutException consumerTimeoutException) {
            throw new MessageReceivingTimeoutException("No messages received", consumerTimeoutException);
        }
        catch (Exception e) {
            throw new InternalProcessingException("Message received failed", (Throwable)e);
        }
    }

    @Override
    public void stop() {
        this.consumerConnector.shutdown();
    }
}

