/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.metric.timer.ConsumerLatencyTimer;
import pl.allegro.tech.hermes.consumers.consumer.rate.ConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.Message;
import pl.allegro.tech.hermes.consumers.consumer.result.ErrorHandler;
import pl.allegro.tech.hermes.consumers.consumer.result.SuccessHandler;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSender;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;
import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout;

public class ConsumerMessageSender {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerMessageSender.class);
    private final ExecutorService retrySingleThreadExecutor;
    private final ExecutorService deliveryReportingExecutor;
    private final SuccessHandler successHandler;
    private final ErrorHandler errorHandler;
    private final ConsumerRateLimiter rateLimiter;
    private final MessageSender messageSender;
    private final Semaphore inflightSemaphore;
    private final HermesMetrics hermesMetrics;
    private final FutureAsyncTimeout<MessageSendingResult> async;
    private final int asyncTimeoutMs;
    private Subscription subscription;
    private volatile boolean consumerIsConsuming = true;

    public ConsumerMessageSender(Subscription subscription, MessageSender messageSender, SuccessHandler successHandler, ErrorHandler errorHandler, ConsumerRateLimiter rateLimiter, ExecutorService deliveryReportingExecutor, Semaphore inflightSemaphore, HermesMetrics hermesMetrics, int asyncTimeoutMs, FutureAsyncTimeout<MessageSendingResult> futureAsyncTimeout) {
        this.deliveryReportingExecutor = deliveryReportingExecutor;
        this.successHandler = successHandler;
        this.errorHandler = errorHandler;
        this.rateLimiter = rateLimiter;
        this.messageSender = messageSender;
        this.subscription = subscription;
        this.inflightSemaphore = inflightSemaphore;
        this.retrySingleThreadExecutor = Executors.newSingleThreadExecutor();
        this.async = futureAsyncTimeout;
        this.hermesMetrics = hermesMetrics;
        this.asyncTimeoutMs = asyncTimeoutMs;
    }

    public void shutdown() {
        this.consumerIsConsuming = false;
    }

    public void sendMessage(Message message) {
        while (this.consumerIsConsuming) {
            ConsumerLatencyTimer consumerLatencyTimer = this.hermesMetrics.latencyTimer(this.subscription);
            try {
                this.submitAsyncSendMessageRequest(message, consumerLatencyTimer);
                return;
            }
            catch (RuntimeException e) {
                consumerLatencyTimer.stop();
                this.handleFailedSending(message, MessageSendingResult.failedResult(e));
                if (!this.isTtlExceeded(message)) continue;
                this.handleMessageDiscarding(message, MessageSendingResult.failedResult(e));
                return;
            }
        }
    }

    public void updateSubscription(Subscription newSubscription) {
        this.subscription = newSubscription;
    }

    private void submitAsyncSendMessageRequest(Message message, ConsumerLatencyTimer consumerLatencyTimer) {
        this.rateLimiter.acquire();
        CompletableFuture<MessageSendingResult> response = this.async.within(this.messageSender.send(message), Duration.ofMillis(this.asyncTimeoutMs));
        response.thenAcceptAsync((Consumer)new ResponseHandlingListener(message, consumerLatencyTimer), (Executor)this.deliveryReportingExecutor);
    }

    private boolean isTtlExceeded(Message message) {
        return message.isTtlExceeded(this.subscription.getSubscriptionPolicy().getMessageTtl());
    }

    private void handleFailedSending(Message message, MessageSendingResult result) {
        if (this.shouldReduceSendingRate(result)) {
            this.rateLimiter.registerFailedSending();
        }
        this.errorHandler.handleFailed(message, this.subscription, result);
    }

    private void handleMessageDiscarding(Message message, MessageSendingResult result) {
        this.inflightSemaphore.release();
        this.errorHandler.handleDiscarded(message, this.subscription, result);
    }

    private void handleMessageSendingSuccess(Message message) {
        this.inflightSemaphore.release();
        this.successHandler.handle(message, this.subscription);
    }

    private boolean shouldReduceSendingRate(MessageSendingResult result) {
        return this.shouldRetrySending(result);
    }

    private boolean shouldRetrySending(MessageSendingResult result) {
        return !result.succeeded() && (!result.isClientError() || this.subscription.getSubscriptionPolicy().isRetryClientErrors());
    }

    class ResponseHandlingListener
    implements Consumer<MessageSendingResult> {
        private final Message message;
        private final ConsumerLatencyTimer consumerlatencyTimer;

        public ResponseHandlingListener(Message message, ConsumerLatencyTimer consumerlatencyTimer) {
            this.message = message;
            this.consumerlatencyTimer = consumerlatencyTimer;
        }

        @Override
        public void accept(MessageSendingResult result) {
            this.consumerlatencyTimer.stop();
            if (result.succeeded()) {
                ConsumerMessageSender.this.rateLimiter.registerSuccessfulSending();
                ConsumerMessageSender.this.handleMessageSendingSuccess(this.message);
            } else {
                ConsumerMessageSender.this.handleFailedSending(this.message, result);
                if (!ConsumerMessageSender.this.isTtlExceeded(this.message) && ConsumerMessageSender.this.shouldRetrySending(result)) {
                    ConsumerMessageSender.this.retrySingleThreadExecutor.execute(() -> this.retrySending(result));
                } else {
                    ConsumerMessageSender.this.handleMessageDiscarding(this.message, result);
                }
            }
        }

        private void retrySending(MessageSendingResult result) {
            if (result.isLoggable()) {
                logger.info(String.format("Retrying message send to endpoint %s; messageId %s; offset: %s; partition: %s; sub id: %s; rootCause: %s", ConsumerMessageSender.this.subscription.getEndpoint().getEndpoint(), this.message.getId().orElse("unknown"), this.message.getOffset(), this.message.getPartition(), ConsumerMessageSender.this.subscription.getId(), result.getRootCause()), result.getFailure());
            }
            ConsumerMessageSender.this.sendMessage(this.message);
        }
    }
}

