/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.batch;

import com.google.common.base.Preconditions;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Queue;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.message.wrapper.MessageContentWrapper;
import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatchFactory;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatchingResult;
import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver;
import pl.allegro.tech.hermes.consumers.consumer.message.MessageConverter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.tracker.consumers.MessageMetadata;
import pl.allegro.tech.hermes.tracker.consumers.Trackers;

@NotThreadSafe
public class MessageBatchReceiver {
    private static final Logger logger = LoggerFactory.getLogger(MessageBatchReceiver.class);
    private final MessageReceiver receiver;
    private final MessageBatchFactory batchFactory;
    private final MessageConverterResolver messageConverterResolver;
    private final MessageContentWrapper messageContentWrapper;
    private final HermesMetrics hermesMetrics;
    private final Trackers trackers;
    private final Queue<Message> inflight;
    private final Topic topic;
    private boolean receiving = true;

    public MessageBatchReceiver(MessageReceiver receiver, MessageBatchFactory batchFactory, HermesMetrics hermesMetrics, MessageConverterResolver messageConverterResolver, MessageContentWrapper messageContentWrapper, Topic topic, Trackers trackers) {
        this.receiver = receiver;
        this.batchFactory = batchFactory;
        this.hermesMetrics = hermesMetrics;
        this.messageConverterResolver = messageConverterResolver;
        this.messageContentWrapper = messageContentWrapper;
        this.topic = topic;
        this.trackers = trackers;
        this.inflight = new ArrayDeque<Message>(1);
    }

    public MessageBatchingResult next(Subscription subscription, Runnable signalsInterrupt) {
        if (logger.isDebugEnabled()) {
            logger.debug("Trying to allocate memory for new batch [subscription={}]", (Object)subscription.getQualifiedName());
        }
        MessageBatch batch = this.batchFactory.createBatch(subscription);
        if (logger.isDebugEnabled()) {
            logger.debug("New batch allocated [subscription={}]", (Object)subscription.getQualifiedName());
        }
        ArrayList<MessageMetadata> discarded = new ArrayList<MessageMetadata>();
        while (this.isReceiving() && !batch.isReadyForDelivery()) {
            signalsInterrupt.run();
            Optional<Message> maybeMessage = this.inflight.isEmpty() ? this.readAndTransform(subscription, batch.getId()) : Optional.ofNullable(this.inflight.poll());
            if (!maybeMessage.isPresent()) continue;
            Message message = maybeMessage.get();
            if (batch.canFit(message.getData())) {
                batch.append(message.getData(), this.messageMetadata(subscription, batch.getId(), message));
                continue;
            }
            if (batch.isBiggerThanTotalCapacity(message.getData())) {
                logger.error("Message size exceeds buffer total capacity [size={}, capacity={}, subscription={}]", new Object[]{message.getData().length, batch.getCapacity(), subscription.getQualifiedName()});
                discarded.add(MessageConverter.toMessageMetadata(message, subscription));
                continue;
            }
            logger.debug("Message too large for current batch [message_size={}, subscription={}]", (Object)message.getData().length, (Object)subscription.getQualifiedName());
            Preconditions.checkArgument((boolean)this.inflight.offer(message));
            break;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Batch is ready for delivery [subscription={}]", (Object)subscription.getQualifiedName());
        }
        return new MessageBatchingResult(batch.close(), discarded);
    }

    private Optional<Message> readAndTransform(Subscription subscription, String batchId) {
        Optional<Message> maybeMessage = this.receiver.next();
        if (maybeMessage.isPresent()) {
            Message message = maybeMessage.get();
            Message transformed = this.messageConverterResolver.converterFor(message, subscription).convert(message, this.topic);
            transformed = Message.message().fromMessage(transformed).withData(this.wrap(subscription, transformed)).build();
            this.hermesMetrics.incrementInflightCounter(subscription);
            this.trackers.get(subscription).logInflight(this.messageMetadata(subscription, batchId, transformed));
            return Optional.of(transformed);
        }
        return Optional.empty();
    }

    private byte[] wrap(Subscription subscription, Message next) {
        switch (subscription.getContentType()) {
            case AVRO: {
                return this.messageContentWrapper.wrapAvro(next.getData(), next.getId(), next.getPublishingTimestamp(), this.topic, next.getSchema().get(), next.getExternalMetadata());
            }
            case JSON: {
                return this.messageContentWrapper.wrapJson(next.getData(), next.getId(), next.getPublishingTimestamp(), next.getExternalMetadata());
            }
        }
        throw new UnsupportedContentTypeException(subscription);
    }

    private MessageMetadata messageMetadata(Subscription subscription, String batchId, Message message) {
        return new MessageMetadata(message.getId(), batchId, message.getOffset(), message.getPartition(), subscription.getQualifiedTopicName(), subscription.getName(), message.getKafkaTopic().asString(), message.getPublishingTimestamp(), message.getReadingTimestamp());
    }

    private boolean isReceiving() {
        return this.receiving;
    }

    public void stop() {
        this.receiving = false;
        this.receiver.stop();
    }

    public void updateSubscription(Subscription modifiedSubscription) {
        this.receiver.update(modifiedSubscription);
    }
}

