/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.BufferOverflowException;
import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatchFactory;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionOffsetCommitQueues;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceivingTimeoutException;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageBatchSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;

public class BatchConsumer
implements Consumer {
    private static final Logger logger = LoggerFactory.getLogger(BatchConsumer.class);
    private final MessageReceiver receiver;
    private final MessageBatchSender sender;
    private final MessageBatchFactory batchFactory;
    private final Clock clock;
    private final SubscriptionOffsetCommitQueues offsets;
    private Subscription subscription;
    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
    boolean consuming = true;
    ObjectMapper mapper = new ObjectMapper();

    public BatchConsumer(MessageReceiver receiver, MessageBatchSender sender, MessageBatchFactory batchFactory, SubscriptionOffsetCommitQueues offsets, Subscription subscription, Clock clock) {
        this.receiver = receiver;
        this.sender = sender;
        this.batchFactory = batchFactory;
        this.offsets = offsets;
        this.subscription = subscription;
        this.clock = clock;
    }

    @Override
    public void run() {
        this.setThreadName();
        Optional<Message> inflight = Optional.empty();
        do {
            MessageBatch batch = this.batchFactory.createBatch(this.subscription);
            inflight = this.fillBatch(batch, inflight);
            batch.close();
            this.deliver(batch, this.clock.millis());
            this.offsets.putAll(batch.getPartitionOffsets());
            this.batchFactory.destroyBatch(batch);
        } while (this.isConsuming());
        logger.info("Stopped consumer for subscription {}", (Object)this.subscription.getId());
        this.unsetThreadName();
        this.stoppedLatch.countDown();
    }

    private Optional<Message> fillBatch(MessageBatch batch, Optional<Message> inflight) {
        while (this.isConsuming() && !batch.isReadyForDelivery()) {
            Message message = inflight.isPresent() ? inflight.get() : this.receiver.next();
            try {
                batch.append(this.getWrappedMessage(message), new PartitionOffset(message.getKafkaTopic(), message.getOffset(), message.getPartition()));
            }
            catch (BufferOverflowException ex) {
                return Optional.of(message);
            }
            catch (MessageReceivingTimeoutException ex) {
            }
        }
        return Optional.empty();
    }

    private byte[] getWrappedMessage(Message message) {
        try {
            HashMap<String, Object> map = new HashMap<String, Object>();
            map.put("message_id", message.getId());
            if (!message.getExternalMetadata().isEmpty()) {
                map.put("metadata", message.getExternalMetadata());
            }
            map.put("content", new String(message.getData()));
            return this.mapper.writeValueAsBytes(map);
        }
        catch (JsonProcessingException e) {
            return message.getData();
        }
    }

    private void deliver(MessageBatch batch, long deliveryStartTime) {
        MessageSendingResult result;
        boolean isRetryRequired;
        while ((isRetryRequired = this.isRetryRequired(result = this.sender.send(batch, this.subscription.getEndpoint()))) && !batch.isTtlExceeded(deliveryStartTime)) {
        }
    }

    private boolean isRetryRequired(MessageSendingResult result) {
        return this.isConsuming() && !result.succeeded() && (!result.isClientError() || this.subscription.getSubscriptionPolicy().isRetryClientErrors());
    }

    @Override
    public Subscription getSubscription() {
        return this.subscription;
    }

    @Override
    public void updateSubscription(Subscription modifiedSubscription) {
        this.subscription = modifiedSubscription;
    }

    @Override
    public void stopConsuming() {
        logger.info("Stopping consumer for subscription {}", (Object)this.subscription.getId());
        this.consuming = false;
    }

    @Override
    public void waitUntilStopped() throws InterruptedException {
        this.stoppedLatch.await();
    }

    @Override
    public List<PartitionOffset> getOffsetsToCommit() {
        return this.offsets.getOffsetsToCommit();
    }

    @Override
    public boolean isConsuming() {
        return this.consuming;
    }
}

