/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer;

import com.codahale.metrics.Timer;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.message.wrapper.MessageContentWrapper;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.consumer.batch.BatchMonitoring;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatchFactory;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatchReceiver;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatchingResult;
import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionOffsetCommitQueues;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageBatchSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;
import pl.allegro.tech.hermes.tracker.consumers.MessageMetadata;
import pl.allegro.tech.hermes.tracker.consumers.Trackers;

public class BatchConsumer
implements Consumer {
    private static final Logger logger = LoggerFactory.getLogger(BatchConsumer.class);
    private final MessageBatchSender sender;
    private final MessageBatchFactory batchFactory;
    private final SubscriptionOffsetCommitQueues offsets;
    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
    private final MessageBatchReceiver receiver;
    private final HermesMetrics hermesMetrics;
    private Subscription subscription;
    boolean consuming = true;
    private BatchMonitoring monitoring;

    public BatchConsumer(MessageReceiver receiver, MessageBatchSender sender, MessageBatchFactory batchFactory, SubscriptionOffsetCommitQueues offsets, MessageConverterResolver messageConverterResolver, MessageContentWrapper messageContentWrapper, HermesMetrics hermesMetrics, Trackers trackers, Subscription subscription, Topic topic) {
        this.receiver = new MessageBatchReceiver(receiver, batchFactory, hermesMetrics, messageConverterResolver, messageContentWrapper, topic, trackers);
        this.sender = sender;
        this.batchFactory = batchFactory;
        this.offsets = offsets;
        this.subscription = subscription;
        this.hermesMetrics = hermesMetrics;
        this.monitoring = new BatchMonitoring(hermesMetrics, trackers);
    }

    @Override
    public void run() {
        this.setThreadName();
        try {
            this.consume();
        }
        finally {
            logger.info("Stopped consumer for subscription {}", (Object)this.subscription.getId());
            this.unsetThreadName();
            this.stoppedLatch.countDown();
        }
    }

    private void consume() {
        while (this.isConsuming()) {
            Optional<Object> inflight = Optional.empty();
            try {
                logger.debug("Trying to create new batch [subscription={}].", (Object)this.subscription.getId());
                MessageBatchingResult result = this.receiver.next(this.subscription);
                inflight = Optional.of(result.getBatch());
                inflight.ifPresent(batch -> {
                    logger.debug("Delivering batch [subscription={}].", (Object)this.subscription.getId());
                    this.deliver((MessageBatch)batch, this.createRetryer((MessageBatch)batch, this.subscription.getBatchSubscriptionPolicy()));
                    logger.debug("Finished delivering batch [subscription={}]", (Object)this.subscription.getId());
                    this.offsets.putAllDelivered(batch.getPartitionOffsets());
                });
                result.getDiscarded().forEach(m -> this.monitoring.markDiscarded((MessageMetadata)m, this.subscription, "too large"));
            }
            finally {
                logger.debug("Cleaning batch [subscription={}]", (Object)this.subscription.getId());
                inflight.ifPresent(this::clean);
            }
        }
    }

    private Retryer<MessageSendingResult> createRetryer(MessageBatch batch, BatchSubscriptionPolicy policy) {
        return this.createRetryer(batch, policy.getMessageBackoff(), policy.getMessageTtl(), policy.isRetryClientErrors());
    }

    private Retryer<MessageSendingResult> createRetryer(MessageBatch batch, int messageBackoff, int messageTtl, boolean retryClientErrors) {
        return RetryerBuilder.newBuilder().retryIfExceptionOfType(IOException.class).retryIfRuntimeException().retryIfResult(result -> this.isConsuming() && !result.succeeded() && this.shouldRetryOnClientError(retryClientErrors, (MessageSendingResult)result)).withWaitStrategy(WaitStrategies.fixedWait((long)messageBackoff, (TimeUnit)TimeUnit.MILLISECONDS)).withStopStrategy(attempt -> attempt.getDelaySinceFirstAttempt() > (long)messageTtl).withRetryListener(this.getRetryListener(result -> {
            batch.incrementRetryCounter();
            this.monitoring.markFailed(batch, this.subscription, (MessageSendingResult)result);
        })).build();
    }

    private boolean shouldRetryOnClientError(boolean retryClientErrors, MessageSendingResult result) {
        return !result.isClientError() || retryClientErrors;
    }

    private void deliver(MessageBatch batch, Retryer<MessageSendingResult> retryer) {
        try (Timer.Context timer = this.hermesMetrics.subscriptionLatencyTimer(this.subscription).time();){
            MessageSendingResult result = (MessageSendingResult)retryer.call(() -> this.sender.send(batch, this.subscription.getEndpoint(), this.subscription.getBatchSubscriptionPolicy().getRequestTimeout()));
            this.monitoring.markSendingResult(batch, this.subscription, result);
        }
        catch (Exception e) {
            logger.error("Batch was rejected [batch_id={}, subscription={}].", new Object[]{batch.getId(), this.subscription.toSubscriptionName(), e});
            this.monitoring.markDiscarded(batch, this.subscription, e.getMessage());
        }
    }

    private void clean(MessageBatch batch) {
        this.batchFactory.destroyBatch(batch);
        this.monitoring.closeInflightMetrics(batch, this.subscription);
    }

    @Override
    public Subscription getSubscription() {
        return this.subscription;
    }

    @Override
    public void updateSubscription(Subscription modifiedSubscription) {
        this.subscription = modifiedSubscription;
    }

    @Override
    public void stopConsuming() {
        logger.info("Stopping consumer [subscription={}].", (Object)this.subscription.getId());
        this.receiver.stop();
        this.consuming = false;
    }

    @Override
    public void waitUntilStopped() throws InterruptedException {
        this.stoppedLatch.await();
    }

    @Override
    public List<PartitionOffset> getOffsetsToCommit() {
        return this.offsets.getOffsetsToCommit();
    }

    @Override
    public boolean isConsuming() {
        return this.consuming;
    }

    private RetryListener getRetryListener(final java.util.function.Consumer<MessageSendingResult> consumer) {
        return new RetryListener(){

            public <V> void onRetry(Attempt<V> attempt) {
                consumer.accept((MessageSendingResult)attempt.getResult());
            }
        };
    }
}

