/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.BatchConsumerMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatchFactory;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatchReceiver;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatchingResult;
import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver;
import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartition;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.rate.BatchConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.consumers.consumer.receiver.ReceiverFactory;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageBatchSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;
import pl.allegro.tech.hermes.metrics.HermesTimerContext;
import pl.allegro.tech.hermes.tracker.consumers.Trackers;

public class BatchConsumer
implements Consumer {
    private static final Logger logger = LoggerFactory.getLogger(BatchConsumer.class);
    private final ReceiverFactory messageReceiverFactory;
    private final MessageBatchSender sender;
    private final MessageBatchFactory batchFactory;
    private final boolean useTopicMessageSize;
    private final MessageConverterResolver messageConverterResolver;
    private final CompositeMessageContentWrapper compositeMessageContentWrapper;
    private final Trackers trackers;
    private final SubscriptionLoadRecorder loadRecorder;
    private final Duration commitPeriod;
    private Topic topic;
    private Subscription subscription;
    private volatile boolean consuming = true;
    private final MetricsFacade metricsFacade;
    private final BatchConsumerMetrics metrics;
    private MessageBatchReceiver receiver;
    private final Map<SubscriptionPartition, Long> maxPendingOffsets = new HashMap<SubscriptionPartition, Long>();
    private Instant lastCommitTime;

    public BatchConsumer(ReceiverFactory messageReceiverFactory, MessageBatchSender sender, MessageBatchFactory batchFactory, MessageConverterResolver messageConverterResolver, CompositeMessageContentWrapper compositeMessageContentWrapper, MetricsFacade metricsFacade, Trackers trackers, Subscription subscription, Topic topic, boolean useTopicMessageSize, SubscriptionLoadRecorder loadRecorder, Duration commitPeriod) {
        this.messageReceiverFactory = messageReceiverFactory;
        this.sender = sender;
        this.batchFactory = batchFactory;
        this.subscription = subscription;
        this.useTopicMessageSize = useTopicMessageSize;
        this.loadRecorder = loadRecorder;
        this.metricsFacade = metricsFacade;
        this.metrics = new BatchConsumerMetrics(metricsFacade, subscription.getQualifiedName());
        this.messageConverterResolver = messageConverterResolver;
        this.compositeMessageContentWrapper = compositeMessageContentWrapper;
        this.topic = topic;
        this.trackers = trackers;
        this.commitPeriod = commitPeriod;
        this.lastCommitTime = Instant.now();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void consume(Runnable signalsInterrupt) {
        Optional<Object> inflight = Optional.empty();
        try {
            logger.debug("Trying to create new batch [subscription={}].", (Object)this.subscription.getQualifiedName());
            signalsInterrupt.run();
            this.commitIfReady();
            MessageBatchingResult result = this.receiver.next(this.subscription, signalsInterrupt);
            inflight = Optional.of(result.getBatch());
            inflight.ifPresent(batch -> {
                logger.debug("Delivering batch [subscription={}].", (Object)this.subscription.getQualifiedName());
                this.deliver(signalsInterrupt, (MessageBatch)batch, this.createRetryer((MessageBatch)batch, this.subscription.getBatchSubscriptionPolicy()));
                this.offerProcessedOffsets((MessageBatch)batch);
                logger.debug("Finished delivering batch [subscription={}]", (Object)this.subscription.getQualifiedName());
            });
            result.getDiscarded().forEach(m -> {
                this.metrics.markDiscarded();
                this.trackers.get(this.subscription).logDiscarded(m, "too large");
            });
        }
        finally {
            logger.debug("Cleaning batch [subscription={}]", (Object)this.subscription.getQualifiedName());
            inflight.ifPresent(this::clean);
        }
    }

    private void commitIfReady() {
        if (this.isReadyToCommit()) {
            HashSet<SubscriptionPartitionOffset> offsetsToCommit = new HashSet<SubscriptionPartitionOffset>();
            for (Map.Entry<SubscriptionPartition, Long> entry : this.maxPendingOffsets.entrySet()) {
                offsetsToCommit.add(new SubscriptionPartitionOffset(entry.getKey(), entry.getValue()));
            }
            if (!offsetsToCommit.isEmpty()) {
                this.commit(offsetsToCommit);
            }
            this.lastCommitTime = Instant.now();
        }
    }

    private boolean isReadyToCommit() {
        return Duration.between(this.lastCommitTime, Instant.now()).toMillis() > this.commitPeriod.toMillis();
    }

    private void offerProcessedOffsets(MessageBatch batch) {
        for (SubscriptionPartitionOffset offset : batch.getPartitionOffsets()) {
            this.putOffset(offset);
        }
    }

    private void putOffset(SubscriptionPartitionOffset offset) {
        this.maxPendingOffsets.compute(offset.getSubscriptionPartition(), (subscriptionPartition, maxOffset) -> maxOffset == null ? offset.getOffset() : Math.max(maxOffset, offset.getOffset()));
    }

    @Override
    public void initialize() {
        this.loadRecorder.initialize();
        logger.debug("Consumer: preparing receiver for subscription {}", (Object)this.subscription.getQualifiedName());
        MessageReceiver receiver = this.messageReceiverFactory.createMessageReceiver(this.topic, this.subscription, new BatchConsumerRateLimiter(), this.loadRecorder, this.metricsFacade, this::putOffset);
        logger.debug("Consumer: preparing batch receiver for subscription {}", (Object)this.subscription.getQualifiedName());
        this.receiver = new MessageBatchReceiver(receiver, this.batchFactory, this.messageConverterResolver, this.compositeMessageContentWrapper, this.topic, this.trackers, this.loadRecorder);
        this.metrics.initialize();
    }

    @Override
    public void tearDown() {
        this.consuming = false;
        if (this.receiver != null) {
            this.receiver.stop();
        } else {
            logger.info("No batch receiver to stop [subscription={}].", (Object)this.subscription.getQualifiedName());
        }
        this.loadRecorder.shutdown();
        this.metrics.shutdown();
    }

    @Override
    public void updateSubscription(Subscription subscription) {
        this.subscription = subscription;
        this.receiver.updateSubscription(subscription);
    }

    @Override
    public void updateTopic(Topic newTopic) {
        if (this.topic.getContentType() != newTopic.getContentType() || this.messageSizeChanged(newTopic)) {
            logger.info("Reinitializing message receiver, contentType or messageSize changed.");
            this.topic = newTopic;
            this.tearDown();
            this.initialize();
        }
    }

    private boolean messageSizeChanged(Topic newTopic) {
        return this.topic.getMaxMessageSize() != newTopic.getMaxMessageSize() && this.useTopicMessageSize;
    }

    @Override
    public void commit(Set<SubscriptionPartitionOffset> offsetsToCommit) {
        if (this.receiver != null) {
            this.receiver.commit(offsetsToCommit);
        }
    }

    @Override
    public boolean moveOffset(PartitionOffset partitionOffset) {
        if (this.receiver != null) {
            return this.receiver.moveOffset(partitionOffset);
        }
        return false;
    }

    @Override
    public Subscription getSubscription() {
        return this.subscription;
    }

    private Retryer<MessageSendingResult> createRetryer(MessageBatch batch, BatchSubscriptionPolicy policy) {
        return this.createRetryer(batch, policy.getMessageBackoff(), TimeUnit.SECONDS.toMillis(policy.getMessageTtl().intValue()), policy.isRetryClientErrors());
    }

    private Retryer<MessageSendingResult> createRetryer(MessageBatch batch, int messageBackoff, long messageTtlMillis, boolean retryClientErrors) {
        return RetryerBuilder.newBuilder().retryIfExceptionOfType(IOException.class).retryIfRuntimeException().retryIfResult(result -> this.consuming && !result.succeeded() && this.shouldRetryOnClientError(retryClientErrors, (MessageSendingResult)result)).withWaitStrategy(WaitStrategies.fixedWait((long)messageBackoff, (TimeUnit)TimeUnit.MILLISECONDS)).withStopStrategy(attempt -> attempt.getDelaySinceFirstAttempt() > messageTtlMillis || Thread.currentThread().isInterrupted()).withRetryListener(this.getRetryListener(result -> {
            batch.incrementRetryCounter();
            this.markSendingResult(batch, (MessageSendingResult)result);
        })).build();
    }

    private void markSendingResult(MessageBatch batch, MessageSendingResult result) {
        if (result.succeeded()) {
            this.metrics.recordAttemptAsFinished(batch.getMessageCount());
            this.metrics.markSuccess(batch, result);
            batch.getMessagesMetadata().forEach(m -> this.trackers.get(this.subscription).logSent(m, result.getHostname()));
        } else {
            this.metrics.markFailure(batch, result);
            batch.getMessagesMetadata().forEach(m -> this.trackers.get(this.subscription).logFailed(m, result.getRootCause(), result.getHostname()));
        }
    }

    private boolean shouldRetryOnClientError(boolean retryClientErrors, MessageSendingResult result) {
        return !result.isClientError() || retryClientErrors;
    }

    private void deliver(Runnable signalsInterrupt, MessageBatch batch, Retryer<MessageSendingResult> retryer) {
        this.metrics.recordAttempt(batch.getMessageCount());
        try (HermesTimerContext ignored = this.metrics.latencyTimer().time();){
            retryer.call(() -> {
                this.loadRecorder.recordSingleOperation();
                signalsInterrupt.run();
                return this.sender.send(batch, this.subscription.getEndpoint(), this.subscription.getEndpointAddressResolverMetadata(), this.subscription.getBatchSubscriptionPolicy().getRequestTimeout());
            });
        }
        catch (Exception e) {
            logger.error("Batch was rejected [batch_id={}, subscription={}].", new Object[]{batch.getId(), this.subscription.getQualifiedName(), e});
            this.metrics.recordAttemptAsFinished(batch.getMessageCount());
            this.metrics.markDiscarded(batch);
            batch.getMessagesMetadata().forEach(m -> this.trackers.get(this.subscription).logDiscarded(m, e.getMessage()));
        }
    }

    private void clean(MessageBatch batch) {
        this.batchFactory.destroyBatch(batch);
    }

    private RetryListener getRetryListener(final java.util.function.Consumer<MessageSendingResult> consumer) {
        return new RetryListener(){

            public <V> void onRetry(Attempt<V> attempt) {
                if (attempt.hasException()) {
                    consumer.accept(MessageSendingResult.failedResult(attempt.getExceptionCause()));
                } else {
                    consumer.accept((MessageSendingResult)attempt.getResult());
                }
            }
        };
    }
}

