/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.client;

import java.net.URI;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.Policy;
import net.jodah.failsafe.RetryPolicy;
import net.jodah.failsafe.event.ExecutionAttemptedEvent;
import net.jodah.failsafe.event.ExecutionCompletedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.client.HermesClientShutdownException;
import pl.allegro.tech.hermes.client.HermesClientTermination;
import pl.allegro.tech.hermes.client.HermesMessage;
import pl.allegro.tech.hermes.client.HermesResponse;
import pl.allegro.tech.hermes.client.HermesResponseBuilder;
import pl.allegro.tech.hermes.client.HermesSender;
import pl.allegro.tech.hermes.client.MessageDeliveryListener;

public class HermesClient {
    private static final Logger logger = LoggerFactory.getLogger(HermesClient.class);
    private final HermesSender sender;
    private final String uri;
    private final Map<String, String> defaultHeaders;
    private final AtomicInteger currentlySending = new AtomicInteger(0);
    private final RetryPolicy<HermesResponse> retryPolicy;
    private final ScheduledExecutorService scheduler;
    private volatile boolean shutdown = false;
    private final List<MessageDeliveryListener> messageDeliveryListeners = new ArrayList<MessageDeliveryListener>();

    HermesClient(HermesSender sender, URI uri, Map<String, String> defaultHeaders, int retries, Predicate<HermesResponse> retryCondition, long retrySleepInMillis, long maxRetrySleepInMillis, ScheduledExecutorService scheduler) {
        this.sender = sender;
        this.uri = this.createUri(uri);
        this.defaultHeaders = Collections.unmodifiableMap(new HashMap<String, String>(defaultHeaders));
        this.retryPolicy = this.createRetryPolicy(retries, retryCondition, retrySleepInMillis, maxRetrySleepInMillis);
        this.scheduler = scheduler;
    }

    private RetryPolicy<HermesResponse> createRetryPolicy(int retries, Predicate<HermesResponse> retryCondition, long retrySleepInMillis, long maxRetrySleepInMillis) {
        RetryPolicy retryPolicy = (RetryPolicy)((RetryPolicy)((RetryPolicy)new RetryPolicy().withMaxRetries(retries).handleIf((resp, cause) -> retryCondition.test((HermesResponse)resp))).onRetriesExceeded(e -> this.handleMaxRetriesExceeded((ExecutionCompletedEvent<HermesResponse>)e)).onRetry(e -> this.handleFailedAttempt((ExecutionAttemptedEvent<HermesResponse>)e)).onFailure(e -> this.handleFailure((ExecutionCompletedEvent<HermesResponse>)e))).onSuccess(e -> this.handleSuccessfulRetry((ExecutionCompletedEvent<HermesResponse>)e));
        if (retrySleepInMillis > 0L) {
            retryPolicy.withBackoff(retrySleepInMillis, maxRetrySleepInMillis, ChronoUnit.MILLIS);
        }
        return retryPolicy;
    }

    private String createUri(URI uri) {
        String uriString = uri.toString();
        return uriString + (uriString.endsWith("/") ? "" : "/") + "topics/";
    }

    public CompletableFuture<HermesResponse> publishJSON(String topic, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).json().build());
    }

    public CompletableFuture<HermesResponse> publishJSON(String topic, String message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).json().build());
    }

    public CompletableFuture<HermesResponse> publishAvro(String topic, int schemaVersion, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).avro(schemaVersion).build());
    }

    public CompletableFuture<HermesResponse> publish(String topic, String message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).build());
    }

    public CompletableFuture<HermesResponse> publish(String topic, String contentType, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).withContentType(contentType).build());
    }

    public CompletableFuture<HermesResponse> publish(String topic, String contentType, String message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).withContentType(contentType).build());
    }

    public CompletableFuture<HermesResponse> publish(String topic, String contentType, int schemaVersion, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).withContentType(contentType).withSchemaVersion(schemaVersion).build());
    }

    public CompletableFuture<HermesResponse> publish(HermesMessage message) {
        if (this.shutdown) {
            return this.completedWithShutdownException();
        }
        HermesMessage.appendDefaults(message, this.defaultHeaders);
        return this.publishWithRetries(message);
    }

    public boolean addMessageDeliveryListener(MessageDeliveryListener listener) {
        return this.messageDeliveryListeners.add(listener);
    }

    private CompletableFuture<HermesResponse> publishWithRetries(HermesMessage message) {
        this.currentlySending.incrementAndGet();
        return Failsafe.with((Policy[])new RetryPolicy[]{this.retryPolicy}).with(this.scheduler).onComplete(e -> this.currentlySending.decrementAndGet()).getStageAsync(() -> this.sendOnce(message));
    }

    private CompletableFuture<HermesResponse> sendOnce(HermesMessage message) {
        long startTime = System.nanoTime();
        return ((CompletableFuture)this.sender.send(URI.create(this.uri + message.getTopic()), message).exceptionally(e -> HermesResponseBuilder.hermesFailureResponse(e, message))).whenComplete((resp, cause) -> {
            long latency = System.nanoTime() - startTime;
            this.messageDeliveryListeners.forEach(l -> l.onSend((HermesResponse)resp, latency));
        });
    }

    private CompletableFuture<HermesResponse> completedWithShutdownException() {
        CompletableFuture<HermesResponse> alreadyShutdown = new CompletableFuture<HermesResponse>();
        alreadyShutdown.completeExceptionally(new HermesClientShutdownException());
        return alreadyShutdown;
    }

    public CompletableFuture<Void> closeAsync(long pollInterval) {
        this.shutdown = true;
        return new HermesClientTermination(pollInterval).observe(() -> this.currentlySending.get() == 0).whenComplete((response, ex) -> this.scheduler.shutdown());
    }

    public void close(long pollInterval, long timeout) throws InterruptedException, TimeoutException {
        try {
            this.closeAsync(pollInterval).get(timeout, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (InterruptedException)e.getCause();
        }
    }

    private void handleMaxRetriesExceeded(ExecutionCompletedEvent<HermesResponse> event) {
        if (((HermesResponse)event.getResult()).isSuccess()) {
            return;
        }
        HermesMessage message = ((HermesResponse)event.getResult()).getHermesMessage();
        this.messageDeliveryListeners.forEach(l -> l.onMaxRetriesExceeded((HermesResponse)event.getResult(), event.getAttemptCount()));
        logger.error("Failed to send message to topic {} after {} attempts", (Object)message.getTopic(), (Object)event.getAttemptCount());
    }

    private void handleFailedAttempt(ExecutionAttemptedEvent<HermesResponse> event) {
        this.messageDeliveryListeners.forEach(l -> l.onFailedRetry((HermesResponse)event.getLastResult(), event.getAttemptCount()));
    }

    private void handleFailure(ExecutionCompletedEvent<HermesResponse> event) {
        this.messageDeliveryListeners.forEach(l -> l.onFailure((HermesResponse)event.getResult(), event.getAttemptCount()));
    }

    private void handleSuccessfulRetry(ExecutionCompletedEvent<HermesResponse> event) {
        this.messageDeliveryListeners.forEach(l -> l.onSuccessfulRetry((HermesResponse)event.getResult(), event.getAttemptCount()));
    }
}

