/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.client;

import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import pl.allegro.tech.hermes.client.HermesClientShutdownException;
import pl.allegro.tech.hermes.client.HermesClientTermination;
import pl.allegro.tech.hermes.client.HermesMessage;
import pl.allegro.tech.hermes.client.HermesResponse;
import pl.allegro.tech.hermes.client.HermesResponseBuilder;
import pl.allegro.tech.hermes.client.HermesSender;

public class HermesClient {
    private final HermesSender sender;
    private final String uri;
    private final Map<String, String> defaultHeaders;
    private final int retries;
    private final Predicate<HermesResponse> retryCondition;
    private final AtomicInteger currentlySending = new AtomicInteger(0);
    private volatile boolean shutdown = false;

    HermesClient(HermesSender sender, URI uri, Map<String, String> defaultHeaders, int retries, Predicate<HermesResponse> retryCondition) {
        this.sender = sender;
        this.uri = this.createUri(uri);
        this.defaultHeaders = Collections.unmodifiableMap(new HashMap<String, String>(defaultHeaders));
        this.retries = retries;
        this.retryCondition = retryCondition;
    }

    private String createUri(URI uri) {
        String uriString = uri.toString();
        return uriString + (uriString.endsWith("/") ? "" : "/") + "topics/";
    }

    public CompletableFuture<HermesResponse> publishJSON(String topic, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).json().build());
    }

    public CompletableFuture<HermesResponse> publishJSON(String topic, String message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).json().build());
    }

    public CompletableFuture<HermesResponse> publishAvro(String topic, int schemaVersion, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).avro(schemaVersion).build());
    }

    public CompletableFuture<HermesResponse> publish(String topic, String message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).build());
    }

    public CompletableFuture<HermesResponse> publish(String topic, String contentType, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).withContentType(contentType).build());
    }

    public CompletableFuture<HermesResponse> publish(String topic, String contentType, String message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).withContentType(contentType).build());
    }

    public CompletableFuture<HermesResponse> publish(String topic, String contentType, int schemaVersion, byte[] message) {
        return this.publish(HermesMessage.hermesMessage(topic, message).withContentType(contentType).withSchemaVersion(schemaVersion).build());
    }

    public CompletableFuture<HermesResponse> publish(HermesMessage message) {
        if (this.shutdown) {
            return this.completedWithShutdownException();
        }
        HermesMessage.appendDefaults(message, this.defaultHeaders);
        return this.publish(message, (HermesResponse response) -> this.retryCondition.test((HermesResponse)response) ? this.sendOnce(message) : CompletableFuture.completedFuture(response));
    }

    private CompletableFuture<HermesResponse> publish(HermesMessage message, Function<HermesResponse, CompletionStage<HermesResponse>> retryDecision) {
        this.currentlySending.incrementAndGet();
        return IntStream.range(0, this.retries).boxed().reduce(this.sendOnce(message), (future, attempt) -> future.thenCompose(retryDecision), (future, attempt) -> future).whenComplete((response, ex) -> this.currentlySending.decrementAndGet());
    }

    private CompletableFuture<HermesResponse> sendOnce(HermesMessage message) {
        return this.sender.send(URI.create(this.uri + message.getTopic()), message).exceptionally(HermesResponseBuilder::hermesFailureResponse);
    }

    private CompletableFuture<HermesResponse> completedWithShutdownException() {
        CompletableFuture<HermesResponse> alreadyShutdown = new CompletableFuture<HermesResponse>();
        alreadyShutdown.completeExceptionally(new HermesClientShutdownException());
        return alreadyShutdown;
    }

    public CompletableFuture<Void> closeAsync(long pollInterval) {
        this.shutdown = true;
        return new HermesClientTermination(pollInterval).observe(() -> this.currentlySending.get() == 0);
    }

    public void close(long pollInterval, long timeout) throws InterruptedException, TimeoutException {
        try {
            this.closeAsync(pollInterval).get(timeout, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw (InterruptedException)e.getCause();
        }
    }
}

