/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.metric.timer.ConsumerLatencyTimer;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.rate.InflightsPool;
import pl.allegro.tech.hermes.consumers.consumer.rate.SerialConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.result.ErrorHandler;
import pl.allegro.tech.hermes.consumers.consumer.result.SuccessHandler;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSenderFactory;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResultLogInfo;
import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout;

public class ConsumerMessageSender {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerMessageSender.class);
    private final ExecutorService deliveryReportingExecutor;
    private final List<SuccessHandler> successHandlers;
    private final List<ErrorHandler> errorHandlers;
    private final SerialConsumerRateLimiter rateLimiter;
    private final MessageSenderFactory messageSenderFactory;
    private final InflightsPool inflight;
    private final FutureAsyncTimeout<MessageSendingResult> async;
    private final int asyncTimeoutMs;
    private int requestTimeoutMs;
    private ConsumerLatencyTimer consumerLatencyTimer;
    private MessageSender messageSender;
    private Subscription subscription;
    private ScheduledExecutorService retrySingleThreadExecutor;
    private volatile boolean running = true;

    public ConsumerMessageSender(Subscription subscription, MessageSenderFactory messageSenderFactory, List<SuccessHandler> successHandlers, List<ErrorHandler> errorHandlers, SerialConsumerRateLimiter rateLimiter, ExecutorService deliveryReportingExecutor, InflightsPool inflight, HermesMetrics hermesMetrics, int asyncTimeoutMs, FutureAsyncTimeout<MessageSendingResult> futureAsyncTimeout) {
        this.deliveryReportingExecutor = deliveryReportingExecutor;
        this.successHandlers = successHandlers;
        this.errorHandlers = errorHandlers;
        this.rateLimiter = rateLimiter;
        this.messageSenderFactory = messageSenderFactory;
        this.messageSender = messageSenderFactory.create(subscription);
        this.subscription = subscription;
        this.inflight = inflight;
        this.async = futureAsyncTimeout;
        this.requestTimeoutMs = subscription.getSerialSubscriptionPolicy().getRequestTimeout();
        this.asyncTimeoutMs = asyncTimeoutMs;
        this.consumerLatencyTimer = hermesMetrics.latencyTimer(subscription);
    }

    public void initialize() {
        this.running = true;
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(this.subscription.getQualifiedName() + "-retry-executor-%d").build();
        this.retrySingleThreadExecutor = Executors.newScheduledThreadPool(1, threadFactory);
    }

    public void shutdown() {
        this.running = false;
        this.retrySingleThreadExecutor.shutdownNow();
        try {
            this.retrySingleThreadExecutor.awaitTermination(1L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            logger.warn("Failed to stop retry executor within one minute with following exception", (Throwable)e);
        }
    }

    public void sendAsync(Message message) {
        this.sendAsync(message, this.delayForSubscription());
    }

    private void sendAsync(Message message, int delayMillis) {
        this.retrySingleThreadExecutor.schedule(() -> this.sendMessage(message), (long)delayMillis, TimeUnit.MILLISECONDS);
    }

    private int delayForSubscription() {
        return this.subscription.getSerialSubscriptionPolicy().getSendingDelay();
    }

    private void sendMessage(Message message) {
        this.rateLimiter.acquire();
        ConsumerLatencyTimer.Context timer = this.consumerLatencyTimer.time();
        CompletableFuture<MessageSendingResult> response = this.async.within(this.messageSender.send(message), Duration.ofMillis(this.asyncTimeoutMs + this.requestTimeoutMs));
        ((CompletableFuture)response.thenAcceptAsync((Consumer)new ResponseHandlingListener(message, timer), (Executor)this.deliveryReportingExecutor)).exceptionally(e -> {
            logger.error("An error occurred while handling message sending response of subscription {} [partition={}, offset={}, id={}]", new Object[]{this.subscription.getQualifiedName(), message.getPartition(), message.getOffset(), message.getId(), e});
            return null;
        });
    }

    public void updateSubscription(Subscription newSubscription) {
        boolean httpClientChanged;
        boolean endpointUpdated = !this.subscription.getEndpoint().equals((Object)newSubscription.getEndpoint());
        boolean subscriptionPolicyUpdated = !Objects.equals(this.subscription.getSerialSubscriptionPolicy(), newSubscription.getSerialSubscriptionPolicy());
        boolean endpointAddressResolverMetadataChanged = !Objects.equals(this.subscription.getEndpointAddressResolverMetadata(), newSubscription.getEndpointAddressResolverMetadata());
        boolean oAuthPolicyChanged = !Objects.equals(this.subscription.getOAuthPolicy(), newSubscription.getOAuthPolicy());
        this.subscription = newSubscription;
        this.requestTimeoutMs = newSubscription.getSerialSubscriptionPolicy().getRequestTimeout();
        boolean bl = httpClientChanged = this.subscription.isHttp2Enabled() != newSubscription.isHttp2Enabled();
        if (endpointUpdated || subscriptionPolicyUpdated || endpointAddressResolverMetadataChanged || oAuthPolicyChanged || httpClientChanged) {
            this.messageSender = this.messageSenderFactory.create(newSubscription);
        }
    }

    private boolean willExceedTtl(Message message, long delay) {
        long ttl = TimeUnit.SECONDS.toMillis(this.subscription.getSerialSubscriptionPolicy().getMessageTtl().intValue());
        long remainingTtl = Math.max(ttl - delay, 0L);
        return message.isTtlExceeded(remainingTtl);
    }

    private void handleFailedSending(Message message, MessageSendingResult result) {
        this.registerResultInRateLimiter(result);
        this.retrySending(message, result);
        this.errorHandlers.forEach(h -> h.handleFailed(message, this.subscription, result));
    }

    private void registerResultInRateLimiter(MessageSendingResult result) {
        if (result.ignoreInRateCalculation(this.subscription.getSerialSubscriptionPolicy().isRetryClientErrors(), this.subscription.hasOAuthPolicy())) {
            this.rateLimiter.registerSuccessfulSending();
        } else {
            this.rateLimiter.registerFailedSending();
        }
    }

    private void retrySending(Message message, MessageSendingResult result) {
        List<URI> succeededUris = result.getSucceededUris(this::messageSentSucceeded);
        message.incrementRetryCounter(succeededUris);
        long retryDelay = this.extractRetryDelay(result);
        if (this.shouldAttemptResending(message, result, retryDelay)) {
            this.retrySingleThreadExecutor.schedule(() -> this.resend(message, result), retryDelay, TimeUnit.MILLISECONDS);
        } else {
            this.handleMessageDiscarding(message, result);
        }
    }

    private boolean shouldAttemptResending(Message message, MessageSendingResult result, long retryDelay) {
        return !this.willExceedTtl(message, retryDelay) && this.shouldResendMessage(result);
    }

    private long extractRetryDelay(MessageSendingResult result) {
        long defaultBackoff = this.subscription.getSerialSubscriptionPolicy().getMessageBackoff().intValue();
        long ttl = TimeUnit.SECONDS.toMillis(this.subscription.getSerialSubscriptionPolicy().getMessageTtl().intValue());
        return result.getRetryAfterMillis().map(delay -> Math.min(delay, ttl)).orElse(defaultBackoff);
    }

    private void resend(Message message, MessageSendingResult result) {
        if (result.isLoggable()) {
            result.getLogInfo().forEach(logInfo -> this.logResultInfo(message, (MessageSendingResultLogInfo)logInfo));
        }
        this.sendMessage(message);
    }

    private void logResultInfo(Message message, MessageSendingResultLogInfo logInfo) {
        logger.debug(String.format("Retrying message send to endpoint %s; messageId %s; offset: %s; partition: %s; sub id: %s; rootCause: %s", logInfo.getUrlString(), message.getId(), message.getOffset(), message.getPartition(), this.subscription.getQualifiedName(), logInfo.getRootCause()), logInfo.getFailure());
    }

    private void handleMessageDiscarding(Message message, MessageSendingResult result) {
        this.inflight.release();
        this.errorHandlers.forEach(h -> h.handleDiscarded(message, this.subscription, result));
    }

    private void handleMessageSendingSuccess(Message message, MessageSendingResult result) {
        this.rateLimiter.registerSuccessfulSending();
        this.inflight.release();
        this.successHandlers.forEach(h -> h.handleSuccess(message, this.subscription, result));
    }

    private boolean messageSentSucceeded(MessageSendingResult result) {
        return result.succeeded() || result.isClientError() && !this.shouldRetryOnClientError();
    }

    private boolean shouldResendMessage(MessageSendingResult result) {
        return !result.succeeded() && (!result.isClientError() || this.shouldRetryOnClientError() || this.isUnauthorizedForOAuthSecuredSubscription(result));
    }

    private boolean shouldRetryOnClientError() {
        return this.subscription.getSerialSubscriptionPolicy().isRetryClientErrors();
    }

    private boolean isUnauthorizedForOAuthSecuredSubscription(MessageSendingResult result) {
        return this.subscription.hasOAuthPolicy() && result.getStatusCode() == 401;
    }

    class ResponseHandlingListener
    implements Consumer<MessageSendingResult> {
        private final Message message;
        private final ConsumerLatencyTimer.Context timer;

        public ResponseHandlingListener(Message message, ConsumerLatencyTimer.Context timer) {
            this.message = message;
            this.timer = timer;
        }

        @Override
        public void accept(MessageSendingResult result) {
            this.timer.stop();
            if (ConsumerMessageSender.this.running) {
                if (result.succeeded()) {
                    ConsumerMessageSender.this.handleMessageSendingSuccess(this.message, result);
                } else {
                    ConsumerMessageSender.this.handleFailedSending(this.message, result);
                }
            } else {
                logger.warn("Process of subscription {} is not running. Ignoring sending message result [successful={}, partition={}, offset={}, id={}]", new Object[]{ConsumerMessageSender.this.subscription.getQualifiedName(), result.succeeded(), this.message.getPartition(), this.message.getOffset(), this.message.getId()});
            }
        }
    }
}

