/*
 * Decompiled with CFR 0.152.
 */
package org.projectnessie.server.distcache;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.quarkus.runtime.Startup;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.projectnessie.nessie.networktools.AddressResolver;
import org.projectnessie.quarkus.config.QuarkusStoreConfig;
import org.projectnessie.quarkus.providers.ServerInstanceId;
import org.projectnessie.server.distcache.CacheInvalidations;
import org.projectnessie.versioned.storage.cache.DistributedCacheInvalidation;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Startup
public class CacheInvalidationSender
implements DistributedCacheInvalidation {
    private static final Logger LOGGER = LoggerFactory.getLogger(CacheInvalidationSender.class);
    private final Vertx vertx;
    private final long serviceNameLookupIntervalMillis;
    private final HttpClient httpClient;
    private final AddressResolver addressResolver;
    private final List<String> serviceNames;
    private final int httpPort;
    private final String invalidationUri;
    private final long requestTimeout;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Lock lock = new ReentrantLock();
    private final int batchSize;
    private final BlockingQueue<CacheInvalidations.CacheInvalidation> invalidations = new LinkedBlockingQueue<CacheInvalidations.CacheInvalidation>();
    private boolean triggered;
    private final String token;
    private volatile List<String> resolvedAddresses = Collections.emptyList();

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Inject
    public CacheInvalidationSender(Vertx vertx, QuarkusStoreConfig config, @ConfigProperty(name="quarkus.management.port") int httpPort, @ServerInstanceId String serverInstanceId) {
        this.vertx = vertx;
        this.addressResolver = new AddressResolver(vertx);
        this.requestTimeout = config.cacheInvalidationRequestTimeout().orElse(Duration.of(30L, ChronoUnit.SECONDS)).toMillis();
        this.httpClient = vertx.createHttpClient();
        this.serviceNames = config.cacheInvalidationServiceNames().orElse(Collections.emptyList());
        this.httpPort = httpPort;
        this.invalidationUri = config.cacheInvalidationUri() + "?sender=" + serverInstanceId;
        this.serviceNameLookupIntervalMillis = config.cacheInvalidationServiceNameLookupInterval().toMillis();
        this.batchSize = config.cacheInvalidationBatchSize();
        this.token = config.cacheInvalidationValidTokens().map(l -> (String)l.get(0)).orElse(null);
        if (!this.serviceNames.isEmpty()) {
            try {
                LOGGER.info("Sending remote cache invalidations to service name(s) {}", this.serviceNames);
                this.updateServiceNames().toCompletionStage().toCompletableFuture().get();
                if (!config.cacheInvalidationValidTokens().isEmpty()) return;
                LOGGER.warn("No token configured for cache invalidation messages - will not send any invalidation message. You need to configure the token(s) via {}.{}", (Object)"nessie.version.store.persist", (Object)"cache-invalidations.valid-tokens");
                return;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to resolve service names " + String.valueOf(this.serviceNames) + " for remote cache invalidations", e instanceof ExecutionException ? e.getCause() : e);
            }
        } else {
            if (this.token == null) return;
            LOGGER.warn("No service names are configured to send cache invalidation messages to - will not send any invalidation message. You need to configure the service name(s) via {}.{}", (Object)"nessie.version.store.persist", (Object)"cache-invalidations.service-names");
        }
    }

    private Future<List<String>> updateServiceNames() {
        HashSet<String> previous = new HashSet<String>(this.resolvedAddresses);
        return this.resolveServiceNames(this.serviceNames).map(all -> all.stream().filter(adr -> !AddressResolver.LOCAL_ADDRESSES.contains(adr)).toList()).onSuccess(all -> {
            this.scheduleServiceNameResolution();
            HashSet resolved = new HashSet(all);
            if (!resolved.equals(previous)) {
                LOGGER.info("Service names for remote cache invalidations {} now resolve to {}", this.serviceNames, all);
            }
            this.updateResolvedAddresses((List<String>)all);
        }).onFailure(t -> {
            this.scheduleServiceNameResolution();
            LOGGER.warn("Failed to resolve service names: {}", (Object)t.toString());
        });
    }

    @VisibleForTesting
    void updateResolvedAddresses(List<String> all) {
        this.resolvedAddresses = all;
    }

    private void scheduleServiceNameResolution() {
        this.vertx.setTimer(this.serviceNameLookupIntervalMillis, x -> this.updateServiceNames());
    }

    @VisibleForTesting
    Future<List<String>> resolveServiceNames(List<String> serviceNames) {
        return this.addressResolver.resolveAll(serviceNames);
    }

    void enqueue(CacheInvalidations.CacheInvalidation invalidation) {
        if (this.serviceNames.isEmpty() || this.token == null) {
            return;
        }
        this.lock.lock();
        try {
            this.invalidations.add(invalidation);
            if (!this.triggered) {
                LOGGER.trace("Triggered invalidation submission");
                this.vertx.executeBlocking(this::sendInvalidations);
                this.triggered = true;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Void sendInvalidations() {
        ArrayList<CacheInvalidations.CacheInvalidation> batch = new ArrayList<CacheInvalidations.CacheInvalidation>(this.batchSize);
        try {
            while (true) {
                block13: {
                    this.lock.lock();
                    try {
                        this.invalidations.drainTo(batch, 100);
                        if (!batch.isEmpty()) break block13;
                        LOGGER.trace("Done sending invalidations");
                        this.triggered = false;
                        break;
                    }
                    finally {
                        this.lock.unlock();
                    }
                }
                this.submit(batch, this.resolvedAddresses);
                batch = new ArrayList(this.batchSize);
            }
        }
        finally {
            if (!batch.isEmpty()) {
                this.lock.lock();
                try {
                    this.invalidations.addAll(batch);
                    this.triggered = false;
                }
                finally {
                    this.lock.unlock();
                }
            }
        }
        return null;
    }

    @VisibleForTesting
    List<Future<Map.Entry<HttpClientResponse, Buffer>>> submit(List<CacheInvalidations.CacheInvalidation> batch, List<String> resolvedAddresses) {
        String json;
        LOGGER.trace("Submitting {} invalidations", (Object)batch.size());
        try {
            json = this.objectMapper.writeValueAsString((Object)CacheInvalidations.cacheInvalidations(batch));
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        ArrayList<Future<Map.Entry<HttpClientResponse, Buffer>>> futures = new ArrayList<Future<Map.Entry<HttpClientResponse, Buffer>>>(resolvedAddresses.size());
        for (String address : resolvedAddresses) {
            futures.add((Future<Map.Entry<HttpClientResponse, Buffer>>)this.httpClient.request(HttpMethod.POST, this.httpPort, address, this.invalidationUri).compose(req -> req.putHeader("Content-Type", "application/json").putHeader("Nessie-Cache-Invalidation-Token", this.token).send(json)).compose(resp -> resp.body().map(b -> Map.entry(resp, b))).timeout(this.requestTimeout, TimeUnit.MILLISECONDS).onComplete(success -> {
                HttpClientResponse resp = (HttpClientResponse)success.getKey();
                int statusCode = resp.statusCode();
                if (statusCode != 200 && statusCode != 204) {
                    LOGGER.warn("{} cache invalidations could not be sent to {}:{}{} - HTTP {}/{} - body: {}", new Object[]{batch.size(), address, this.httpPort, this.invalidationUri, statusCode, resp.statusMessage(), success.getValue()});
                } else {
                    LOGGER.trace("{} cache invalidations sent to {}:{}", new Object[]{batch.size(), address, this.httpPort});
                }
            }, failure -> {
                if (failure instanceof SocketException || failure instanceof UnknownHostException) {
                    LOGGER.warn("Technical network issue sending cache invalidations to {}:{}{} : {}", new Object[]{address, this.httpPort, this.invalidationUri, failure.getMessage()});
                } else {
                    LOGGER.error("Technical failure sending cache invalidations to {}:{}{}", new Object[]{address, this.httpPort, this.invalidationUri, failure});
                }
            }));
        }
        return futures;
    }

    public void evictReference(String repositoryId, String refName) {
        this.enqueue(CacheInvalidations.CacheInvalidationEvictReference.cacheInvalidationEvictReference(repositoryId, refName));
    }

    public void evictObj(String repositoryId, ObjId objId) {
        this.enqueue(CacheInvalidations.CacheInvalidationEvictObj.cacheInvalidationEvictObj(repositoryId, objId.asByteArray()));
    }
}

