/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.consumers.consumer.rate.ConsumerRateLimiter;
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult;
import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout;

public class ResilientMessageSender {
    private final ConsumerRateLimiter rateLimiter;
    private final List<Predicate<MessageSendingResult>> ignore;
    private final FutureAsyncTimeout async;
    private final int requestTimeoutMs;
    private final int asyncTimeoutMs;

    public ResilientMessageSender(ConsumerRateLimiter rateLimiter, Subscription subscription, FutureAsyncTimeout async, int requestTimeoutMs, int asyncTimeoutMs) {
        this.rateLimiter = rateLimiter;
        this.ignore = ResilientMessageSender.ignorableErrors(subscription);
        this.async = async;
        this.requestTimeoutMs = requestTimeoutMs;
        this.asyncTimeoutMs = asyncTimeoutMs;
    }

    private static List<Predicate<MessageSendingResult>> ignorableErrors(Subscription subscription) {
        Predicate<MessageSendingResult> ignore = result -> result.ignoreInRateCalculation(subscription.getSerialSubscriptionPolicy().isRetryClientErrors(), subscription.hasOAuthPolicy());
        return Collections.singletonList(ignore);
    }

    public <T extends MessageSendingResult> CompletableFuture<T> send(Consumer<CompletableFuture<T>> resultFutureConsumer, Function<Throwable, T> exceptionMapper) {
        try {
            this.rateLimiter.acquire();
            CompletableFuture resultFuture = new CompletableFuture();
            resultFutureConsumer.accept(resultFuture);
            CompletableFuture timeoutGuardedResultFuture = this.async.within(resultFuture, Duration.ofMillis(this.asyncTimeoutMs + this.requestTimeoutMs), exceptionMapper);
            return this.withCompletionHandle(timeoutGuardedResultFuture, exceptionMapper);
        }
        catch (Exception e) {
            this.rateLimiter.registerFailedSending();
            return CompletableFuture.completedFuture((MessageSendingResult)exceptionMapper.apply(e));
        }
    }

    private <T extends MessageSendingResult> CompletableFuture<T> withCompletionHandle(CompletableFuture<T> future, Function<Throwable, T> exceptionMapper) {
        return future.handle((result, throwable) -> {
            if (throwable != null) {
                this.rateLimiter.registerFailedSending();
                return (MessageSendingResult)exceptionMapper.apply((Throwable)throwable);
            }
            if (result.succeeded()) {
                this.rateLimiter.registerSuccessfulSending();
            } else {
                this.registerResultInRateLimiter((MessageSendingResult)result);
            }
            return result;
        });
    }

    private void registerResultInRateLimiter(MessageSendingResult result) {
        if (this.ignore.stream().anyMatch(p -> p.test(result))) {
            this.rateLimiter.registerSuccessfulSending();
        } else {
            this.rateLimiter.registerFailedSending();
        }
    }
}

