/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.URI;
import java.time.Clock;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender;
import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder;
import pl.allegro.tech.hermes.consumers.consumer.offset.PendingOffsets;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.profiling.ConsumerProfiler;
import pl.allegro.tech.hermes.consumers.consumer.profiling.ConsumerRun;
import pl.allegro.tech.hermes.consumers.consumer.profiling.DefaultConsumerProfiler;
import pl.allegro.tech.hermes.consumers.consumer.profiling.NoOpConsumerProfiler;
import pl.allegro.tech.hermes.consumers.consumer.rate.SerialConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.result.ErrorHandler;
import pl.allegro.tech.hermes.consumers.consumer.result.SuccessHandler;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSenderFactory;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResultLogInfo;
import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout;
import pl.allegro.tech.hermes.metrics.HermesCounter;
import pl.allegro.tech.hermes.metrics.HermesTimer;
import pl.allegro.tech.hermes.metrics.HermesTimerContext;

public class ConsumerMessageSender {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerMessageSender.class);
    private final ExecutorService deliveryReportingExecutor;
    private final List<SuccessHandler> successHandlers;
    private final List<ErrorHandler> errorHandlers;
    private final MessageSenderFactory messageSenderFactory;
    private final Clock clock;
    private final PendingOffsets pendingOffsets;
    private final SubscriptionLoadRecorder loadRecorder;
    private final HermesTimer consumerLatencyTimer;
    private final HermesCounter retries;
    private final SerialConsumerRateLimiter rateLimiter;
    private final HermesTimer rateLimiterAcquireTimer;
    private final FutureAsyncTimeout async;
    private final int asyncTimeoutMs;
    private final LongAdder inflightCount = new LongAdder();
    private MessageSender messageSender;
    private Subscription subscription;
    private ScheduledExecutorService retrySingleThreadExecutor;
    private volatile boolean running = true;

    public ConsumerMessageSender(Subscription subscription, MessageSenderFactory messageSenderFactory, List<SuccessHandler> successHandlers, List<ErrorHandler> errorHandlers, SerialConsumerRateLimiter rateLimiter, ExecutorService deliveryReportingExecutor, PendingOffsets pendingOffsets, MetricsFacade metrics, int asyncTimeoutMs, FutureAsyncTimeout futureAsyncTimeout, Clock clock, SubscriptionLoadRecorder loadRecorder) {
        this.deliveryReportingExecutor = deliveryReportingExecutor;
        this.successHandlers = successHandlers;
        this.errorHandlers = errorHandlers;
        this.messageSenderFactory = messageSenderFactory;
        this.clock = clock;
        this.loadRecorder = loadRecorder;
        this.async = futureAsyncTimeout;
        this.rateLimiter = rateLimiter;
        this.asyncTimeoutMs = asyncTimeoutMs;
        this.messageSender = this.messageSender(subscription);
        this.subscription = subscription;
        this.pendingOffsets = pendingOffsets;
        this.consumerLatencyTimer = metrics.subscriptions().latency(subscription.getQualifiedName());
        metrics.subscriptions().registerInflightGauge(subscription.getQualifiedName(), (Object)this, sender -> sender.inflightCount.doubleValue());
        this.retries = metrics.subscriptions().retries(subscription.getQualifiedName());
        this.rateLimiterAcquireTimer = metrics.subscriptions().rateLimiterAcquire(subscription.getQualifiedName());
    }

    public void initialize() {
        this.running = true;
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(String.valueOf(this.subscription.getQualifiedName()) + "-retry-executor-%d").build();
        this.retrySingleThreadExecutor = Executors.newScheduledThreadPool(1, threadFactory);
    }

    public void shutdown() {
        this.running = false;
        this.messageSender.stop();
        this.retrySingleThreadExecutor.shutdownNow();
        try {
            this.retrySingleThreadExecutor.awaitTermination(1L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            logger.warn("Failed to stop retry executor within one minute with following exception", (Throwable)e);
        }
    }

    public void sendAsync(Message message, ConsumerProfiler profiler) {
        this.inflightCount.increment();
        this.sendAsync(message, this.calculateMessageDelay(message.getPublishingTimestamp()), profiler);
    }

    private void sendAsync(Message message, int delayMillis, ConsumerProfiler profiler) {
        profiler.measure("retrySingleThreadExecutor.schedule");
        this.retrySingleThreadExecutor.schedule(() -> this.sendMessage(message, profiler), (long)delayMillis, TimeUnit.MILLISECONDS);
    }

    private int calculateMessageDelay(long publishingMessageTimestamp) {
        Integer delay = this.subscription.getSerialSubscriptionPolicy().getSendingDelay();
        if (NumberUtils.INTEGER_ZERO.equals(delay)) {
            return delay;
        }
        long messageAgeAtThisPoint = this.clock.millis() - publishingMessageTimestamp;
        delay = delay - (int)messageAgeAtThisPoint;
        return Math.max(delay, NumberUtils.INTEGER_ZERO);
    }

    private void sendMessage(Message message, ConsumerProfiler profiler) {
        this.loadRecorder.recordSingleOperation();
        profiler.measure("acquireRateLimiter");
        this.acquireRateLimiterWithTimer();
        HermesTimerContext timer = this.consumerLatencyTimer.time();
        profiler.measure("messageSender.send");
        CompletableFuture<MessageSendingResult> response = this.messageSender.send(message);
        ((CompletableFuture)response.thenAcceptAsync((Consumer)new ResponseHandlingListener(message, timer, profiler), (Executor)this.deliveryReportingExecutor)).exceptionally(e -> {
            logger.error("An error occurred while handling message sending response of subscription {} [partition={}, offset={}, id={}]", new Object[]{this.subscription.getQualifiedName(), message.getPartition(), message.getOffset(), message.getId(), e});
            return null;
        });
    }

    private void acquireRateLimiterWithTimer() {
        HermesTimerContext acquireTimer = this.rateLimiterAcquireTimer.time();
        this.rateLimiter.acquire();
        acquireTimer.close();
    }

    private MessageSender messageSender(Subscription subscription) {
        Integer requestTimeoutMs = subscription.getSerialSubscriptionPolicy().getRequestTimeout();
        ResilientMessageSender resilientMessageSender = new ResilientMessageSender(this.rateLimiter, subscription, this.async, requestTimeoutMs, this.asyncTimeoutMs);
        return this.messageSenderFactory.create(subscription, resilientMessageSender);
    }

    public void updateSubscription(Subscription newSubscription) {
        boolean httpClientChanged;
        boolean endpointUpdated = !this.subscription.getEndpoint().equals((Object)newSubscription.getEndpoint());
        boolean subscriptionPolicyUpdated = !Objects.equals(this.subscription.getSerialSubscriptionPolicy(), newSubscription.getSerialSubscriptionPolicy());
        boolean endpointAddressResolverMetadataChanged = !Objects.equals(this.subscription.getEndpointAddressResolverMetadata(), newSubscription.getEndpointAddressResolverMetadata());
        boolean oAuthPolicyChanged = !Objects.equals(this.subscription.getOAuthPolicy(), newSubscription.getOAuthPolicy());
        this.subscription = newSubscription;
        boolean bl = httpClientChanged = this.subscription.isHttp2Enabled() != newSubscription.isHttp2Enabled();
        if (endpointUpdated || subscriptionPolicyUpdated || endpointAddressResolverMetadataChanged || oAuthPolicyChanged || httpClientChanged) {
            this.messageSender.stop();
            this.messageSender = this.messageSender(newSubscription);
        }
    }

    private boolean willExceedTtl(Message message, long delay) {
        long ttl = TimeUnit.SECONDS.toMillis(this.subscription.getSerialSubscriptionPolicy().getMessageTtl().intValue());
        long remainingTtl = Math.max(ttl - delay, 0L);
        return message.isTtlExceeded(remainingTtl);
    }

    private void handleFailedSending(Message message, MessageSendingResult result, ConsumerProfiler profiler) {
        this.errorHandlers.forEach(h -> h.handleFailed(message, this.subscription, result));
        this.retrySendingOrDiscard(message, result, profiler);
    }

    private void retrySendingOrDiscard(Message message, MessageSendingResult result, ConsumerProfiler profiler) {
        List<URI> succeededUris = result.getSucceededUris(this::messageSentSucceeded);
        message.incrementRetryCounter(succeededUris);
        long retryDelay = this.extractRetryDelay(message, result);
        if (this.shouldAttemptResending(message, result, retryDelay)) {
            this.retries.increment();
            profiler.flushMeasurements(ConsumerRun.RETRIED);
            ConsumerProfiler resendProfiler = this.subscription.isProfilingEnabled() ? new DefaultConsumerProfiler(this.subscription.getQualifiedName(), this.subscription.getProfilingThresholdMs()) : new NoOpConsumerProfiler();
            resendProfiler.startMeasurements("schedule.resend");
            resendProfiler.saveRetryDelay(retryDelay);
            this.retrySingleThreadExecutor.schedule(() -> this.resend(message, result, resendProfiler), retryDelay, TimeUnit.MILLISECONDS);
        } else {
            this.handleMessageDiscarding(message, result, profiler);
        }
    }

    private boolean shouldAttemptResending(Message message, MessageSendingResult result, long retryDelay) {
        return !this.willExceedTtl(message, retryDelay) && this.shouldResendMessage(result);
    }

    private long extractRetryDelay(Message message, MessageSendingResult result) {
        long defaultBackoff = message.updateAndGetCurrentMessageBackoff(this.subscription.getSerialSubscriptionPolicy());
        long ttl = TimeUnit.SECONDS.toMillis(this.subscription.getSerialSubscriptionPolicy().getMessageTtl().intValue());
        return result.getRetryAfterMillis().map(delay -> Math.min(delay, ttl)).orElse(defaultBackoff);
    }

    private void resend(Message message, MessageSendingResult result, ConsumerProfiler profiler) {
        if (result.isLoggable()) {
            result.getLogInfo().forEach(logInfo -> this.logResultInfo(message, (MessageSendingResultLogInfo)logInfo));
        }
        this.sendMessage(message, profiler);
    }

    private void logResultInfo(Message message, MessageSendingResultLogInfo logInfo) {
        logger.debug(String.format("Retrying message send to endpoint %s; messageId %s; offset: %s; partition: %s; sub id: %s; rootCause: %s", logInfo.getUrlString(), message.getId(), message.getOffset(), message.getPartition(), this.subscription.getQualifiedName(), logInfo.getRootCause()), logInfo.getFailure());
    }

    private void handleMessageDiscarding(Message message, MessageSendingResult result, ConsumerProfiler profiler) {
        this.pendingOffsets.markAsProcessed(SubscriptionPartitionOffset.subscriptionPartitionOffset(this.subscription.getQualifiedName(), message.getPartitionOffset(), message.getPartitionAssignmentTerm()));
        this.inflightCount.decrement();
        this.errorHandlers.forEach(h -> h.handleDiscarded(message, this.subscription, result));
        profiler.flushMeasurements(ConsumerRun.DISCARDED);
    }

    private void handleMessageSendingSuccess(Message message, MessageSendingResult result, ConsumerProfiler profiler) {
        this.pendingOffsets.markAsProcessed(SubscriptionPartitionOffset.subscriptionPartitionOffset(this.subscription.getQualifiedName(), message.getPartitionOffset(), message.getPartitionAssignmentTerm()));
        this.inflightCount.decrement();
        this.successHandlers.forEach(h -> h.handleSuccess(message, this.subscription, result));
        profiler.flushMeasurements(ConsumerRun.DELIVERED);
    }

    private boolean messageSentSucceeded(MessageSendingResult result) {
        return result.succeeded() || result.isClientError() && !this.shouldRetryOnClientError();
    }

    private boolean shouldResendMessage(MessageSendingResult result) {
        return !result.succeeded() && (!result.isClientError() || this.shouldRetryOnClientError() || this.isUnauthorizedForOAuthSecuredSubscription(result));
    }

    private boolean shouldRetryOnClientError() {
        return this.subscription.getSerialSubscriptionPolicy().isRetryClientErrors();
    }

    private boolean isUnauthorizedForOAuthSecuredSubscription(MessageSendingResult result) {
        return this.subscription.hasOAuthPolicy() && result.getStatusCode() == 401;
    }

    class ResponseHandlingListener
    implements Consumer<MessageSendingResult> {
        private final Message message;
        private final HermesTimerContext timer;
        private final ConsumerProfiler profiler;

        public ResponseHandlingListener(Message message, HermesTimerContext timer, ConsumerProfiler profiler) {
            this.message = message;
            this.timer = timer;
            this.profiler = profiler;
        }

        @Override
        public void accept(MessageSendingResult result) {
            this.timer.close();
            ConsumerMessageSender.this.loadRecorder.recordSingleOperation();
            this.profiler.measure("handlers");
            if (ConsumerMessageSender.this.running) {
                if (result.succeeded()) {
                    ConsumerMessageSender.this.handleMessageSendingSuccess(this.message, result, this.profiler);
                } else {
                    ConsumerMessageSender.this.handleFailedSending(this.message, result, this.profiler);
                }
            } else {
                logger.warn("Process of subscription {} is not running. Ignoring sending message result [successful={}, partition={}, offset={}, id={}]", new Object[]{ConsumerMessageSender.this.subscription.getQualifiedName(), result.succeeded(), this.message.getPartition(), this.message.getOffset(), this.message.getId()});
            }
        }
    }
}

