/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.client;

import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.client.HermesClientShutdownException;
import pl.allegro.tech.hermes.client.HermesClientTermination;
import pl.allegro.tech.hermes.client.HermesMessage;
import pl.allegro.tech.hermes.client.HermesResponse;
import pl.allegro.tech.hermes.client.HermesResponseBuilder;
import pl.allegro.tech.hermes.client.MessageDeliveryListener;
import pl.allegro.tech.hermes.client.ReactiveHermesSender;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.util.retry.Retry;

public class ReactiveHermesClient {
    private static final Logger logger = LoggerFactory.getLogger(ReactiveHermesClient.class);
    private static final String RETRY_CONTEXT_KEY = "hermes-retry-context-key";
    private final ReactiveHermesSender sender;
    private final String uri;
    private final Map<String, String> defaultHeaders;
    private final AtomicInteger currentlySending = new AtomicInteger(0);
    private final int maxRetries;
    private final Predicate<HermesResponse> retryCondition;
    private final Duration retrySleep;
    private final Duration maxRetrySleep;
    private final Double jitterFactor;
    private final Scheduler scheduler;
    private volatile boolean shutdown = false;
    private final List<MessageDeliveryListener> messageDeliveryListeners;

    ReactiveHermesClient(ReactiveHermesSender sender, URI uri, Map<String, String> defaultHeaders, int maxRetries, Predicate<HermesResponse> retryCondition, long retrySleepInMillis, long maxRetrySleepInMillis, double jitterFactor, Scheduler scheduler) {
        this.sender = sender;
        this.uri = this.createUri(uri);
        this.defaultHeaders = Collections.unmodifiableMap(new HashMap<String, String>(defaultHeaders));
        this.maxRetries = maxRetries;
        this.retryCondition = retryCondition;
        this.retrySleep = Duration.ofMillis(retrySleepInMillis);
        this.maxRetrySleep = Duration.ofMillis(maxRetrySleepInMillis);
        this.jitterFactor = jitterFactor;
        this.scheduler = scheduler;
        this.messageDeliveryListeners = new ArrayList<MessageDeliveryListener>();
    }

    private String createUri(URI uri) {
        String uriString;
        return uriString + ((uriString = uri.toString()).endsWith("/") ? "" : "/") + "topics/";
    }

    public Mono<HermesResponse> publishJSON(String topic, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).json().build());
    }

    public Mono<HermesResponse> publishJSON(String topic, String message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).json().build());
    }

    public Mono<HermesResponse> publishAvro(String topic, int schemaVersion, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).avro(schemaVersion).build());
    }

    public Mono<HermesResponse> publish(String topic, String message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).build());
    }

    public Mono<HermesResponse> publish(String topic, String contentType, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).withContentType(contentType).build());
    }

    public Mono<HermesResponse> publish(String topic, String contentType, String message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).withContentType(contentType).build());
    }

    public Mono<HermesResponse> publish(String topic, String contentType, int schemaVersion, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).withContentType(contentType).withSchemaVersion(schemaVersion).build());
    }

    public Mono<HermesResponse> publish(HermesMessage message) {
        if (this.shutdown) {
            return this.completedWithShutdownException();
        }
        HermesMessage.appendDefaults(message, this.defaultHeaders);
        return this.publishWithRetries(message);
    }

    public boolean addMessageDeliveryListener(MessageDeliveryListener listener) {
        return this.messageDeliveryListeners.add(listener);
    }

    private Mono<HermesResponse> publishWithRetries(HermesMessage message) {
        this.currentlySending.incrementAndGet();
        Mono sendOnceResult = this.sendOnce(message).flatMap(this::testRetryCondition).onErrorResume(exception -> this.mapExceptionToFailedAttempt((Throwable)exception, message)).subscribeOn(this.scheduler);
        return this.retry((Mono<Result>)sendOnceResult).doOnSuccess(hr -> {
            if (hr.response.isSuccess() || !hr.matchesRetryPolicy) {
                this.handleSuccessfulRetry(hr.response, hr.attempt);
            } else {
                this.handleFailure(hr.response, hr.attempt);
            }
        }).map(result -> result.response).doFinally(s -> this.currentlySending.decrementAndGet());
    }

    private Mono<Attempt> retry(Mono<Result> sendOnceResult) {
        Retry retrySpec = this.prepareRetrySpec();
        return sendOnceResult.flatMap(this::unwrapFailedAttemptAsException).retryWhen(retrySpec).subscriberContext(ctx -> ctx.put((Object)RETRY_CONTEXT_KEY, (Object)HermesRetryContext.emptyRetryContext())).onErrorResume(Exception.class, this::unwrapRetryExhaustedException);
    }

    private Mono<Attempt> unwrapRetryExhaustedException(Exception exception) {
        if (Exceptions.isRetryExhausted((Throwable)exception)) {
            RetryFailedException rfe = (RetryFailedException)exception.getCause();
            Failed failedAttempt = rfe.failed;
            HermesResponse hermesResponse = failedAttempt.hermesResponse;
            this.handleMaxRetriesExceeded(hermesResponse, failedAttempt.attempt);
            Throwable cause = rfe.getCause();
            if (cause instanceof ShouldRetryException) {
                ShouldRetryException sre = (ShouldRetryException)cause;
                return Mono.just((Object)((Attempt)Result.attempt(sre.hermesResponse, failedAttempt.attempt, true)));
            }
            if (hermesResponse.isFailure()) {
                this.handleFailure(hermesResponse, failedAttempt.attempt);
            }
        }
        return Mono.error((Throwable)exception);
    }

    private Mono<Attempt> unwrapFailedAttemptAsException(Result result) {
        if (result instanceof Failed) {
            Failed failed = (Failed)result;
            return Mono.error((Throwable)new RetryFailedException(failed));
        }
        return Mono.just((Object)((Attempt)result));
    }

    private Mono<Result> mapExceptionToFailedAttempt(Throwable throwable, HermesMessage hermesMessage) {
        return this.getNextAttempt().map(attempt -> Result.failureByException(throwable, hermesMessage, attempt));
    }

    private Mono<Result> testRetryCondition(HermesResponse response) {
        return this.getNextAttempt().map(attempt -> {
            if (this.retryCondition.test(response)) {
                return Result.retryableFailure(response, attempt);
            }
            return Result.attempt(response, attempt, false);
        });
    }

    private Retry prepareRetrySpec() {
        if (this.retrySleep.isZero()) {
            return Retry.max((long)this.maxRetries).doAfterRetry(this::handleRetryAttempt);
        }
        return Retry.backoff((long)this.maxRetries, (Duration)this.retrySleep).maxBackoff(this.maxRetrySleep).jitter(this.jitterFactor.doubleValue()).doAfterRetry(this::handleRetryAttempt);
    }

    private void handleRetryAttempt(Retry.RetrySignal retrySignal) {
        RetryFailedException failedException = (RetryFailedException)retrySignal.failure();
        this.handleFailedAttempt(failedException.failed.hermesResponse, retrySignal.totalRetries() + 1L);
    }

    private Mono<Integer> getNextAttempt() {
        return Mono.subscriberContext().map(ctx -> ((HermesRetryContext)ctx.getOrDefault((Object)RETRY_CONTEXT_KEY, (Object)HermesRetryContext.emptyRetryContext())).getAndIncrementAttempt());
    }

    private Mono<HermesResponse> sendOnce(HermesMessage message) {
        return Mono.defer(() -> {
            long startTime = System.nanoTime();
            try {
                return this.sender.sendReactively(URI.create(this.uri + message.getTopic()), message).onErrorResume(e -> Mono.just((Object)HermesResponseBuilder.hermesFailureResponse(e, message))).doOnNext(resp -> {
                    long latency = System.nanoTime() - startTime;
                    this.messageDeliveryListeners.forEach(l -> l.onSend((HermesResponse)resp, latency));
                });
            }
            catch (Exception e2) {
                return Mono.error((Throwable)e2);
            }
        });
    }

    private Mono<HermesResponse> completedWithShutdownException() {
        return Mono.error((Throwable)new HermesClientShutdownException());
    }

    public Mono<Void> closeAsync(long pollInterval) {
        this.shutdown = true;
        CompletionStage voidCompletableFuture = new HermesClientTermination(pollInterval).observe(() -> this.currentlySending.get() == 0).whenComplete((response, ex) -> this.scheduler.dispose());
        return Mono.fromFuture((CompletableFuture)voidCompletableFuture);
    }

    public void close(long pollInterval, long timeout) {
        this.closeAsync(pollInterval).block(Duration.ofMillis(timeout));
    }

    private void handleMaxRetriesExceeded(HermesResponse response, int attemptCount) {
        this.messageDeliveryListeners.forEach(l -> l.onMaxRetriesExceeded(response, attemptCount));
        logger.error("Failed to send message to topic {} after {} attempts", (Object)response.getHermesMessage().getTopic(), (Object)attemptCount);
    }

    private void handleFailedAttempt(HermesResponse response, long attemptCount) {
        this.messageDeliveryListeners.forEach(l -> l.onFailedRetry(response, (int)attemptCount));
    }

    private void handleFailure(HermesResponse response, long attemptCount) {
        this.messageDeliveryListeners.forEach(l -> l.onFailure(response, (int)attemptCount));
    }

    private void handleSuccessfulRetry(HermesResponse response, long attemptCount) {
        this.messageDeliveryListeners.forEach(l -> l.onSuccessfulRetry(response, (int)attemptCount));
    }

    private static class RetryFailedException
    extends Exception {
        private final Failed failed;

        public RetryFailedException(Failed failed) {
            super(failed.cause);
            this.failed = failed;
        }
    }

    private static class Failed
    implements Result {
        private final HermesResponse hermesResponse;
        private final int attempt;
        private final Throwable cause;

        private Failed(HermesResponse hermesResponse, int attempt, Throwable cause) {
            this.attempt = attempt;
            this.cause = cause;
            this.hermesResponse = hermesResponse;
        }
    }

    private static class ShouldRetryException
    extends Exception {
        private final HermesResponse hermesResponse;

        public ShouldRetryException(HermesResponse hermesResponse) {
            this.hermesResponse = hermesResponse;
        }

        public HermesResponse getHermesResponse() {
            return this.hermesResponse;
        }
    }

    private static interface Result {
        public static Result attempt(HermesResponse response, int attempt, boolean qualifiedForRetry) {
            return new Attempt(response, attempt, qualifiedForRetry);
        }

        public static Result retryableFailure(HermesResponse response, int attempt) {
            return new Failed(response, attempt, new ShouldRetryException(response));
        }

        public static Result failureByException(Throwable throwable, HermesMessage hermesMessage, int attempt) {
            return new Failed(HermesResponseBuilder.hermesFailureResponse(throwable, hermesMessage), attempt, throwable);
        }
    }

    private static class Attempt
    implements Result {
        private final HermesResponse response;
        private final int attempt;
        private final boolean matchesRetryPolicy;

        private Attempt(HermesResponse response, int attempt, boolean matchesRetryPolicy) {
            this.response = response;
            this.attempt = attempt;
            this.matchesRetryPolicy = matchesRetryPolicy;
        }
    }

    private static class HermesRetryContext {
        private int attempt = 1;

        static HermesRetryContext emptyRetryContext() {
            return new HermesRetryContext();
        }

        HermesRetryContext() {
        }

        int getAndIncrementAttempt() {
            return this.attempt++;
        }
    }
}

