/*
 * Decompiled with CFR 0.152.
 */
package org.lable.oss.uniqueid.etcd;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.lock.LockResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.OptionsUtil;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.lable.oss.uniqueid.etcd.ClusterID;
import org.lable.oss.uniqueid.etcd.ResourcePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RegistryBasedResourceClaim {
    private static final Logger logger = LoggerFactory.getLogger(RegistryBasedResourceClaim.class);
    static final String REGISTRY_PREFIX = "registry/";
    static final ByteSequence REGISTRY_KEY = ByteSequence.from("registry/", StandardCharsets.UTF_8);
    static final ByteSequence LOCK_NAME = ByteSequence.from("unique-id-registry-lock", StandardCharsets.UTF_8);
    final Supplier<Client> connectToEtcd;
    final String registryEntry;
    final int clusterId;
    final int generatorId;
    final int poolSize;
    final KV kvClient;

    RegistryBasedResourceClaim(Supplier<Client> connectToEtcd, int maxGeneratorCount, String registryEntry, Duration acquisitionTimeout, boolean waitWhenNoResourcesAvailable) throws IOException {
        LockResponse lock;
        long leaseId;
        this.registryEntry = registryEntry;
        this.connectToEtcd = connectToEtcd;
        Duration timeout = acquisitionTimeout == null ? Duration.ofMinutes(5L) : acquisitionTimeout;
        logger.info("Acquiring resource-claim; timeout is set to {}.", (Object)timeout);
        Client etcd = connectToEtcd.get();
        this.kvClient = etcd.getKVClient();
        List<Integer> clusterIds = ClusterID.get(etcd);
        Instant giveUpAfter = Instant.now().plus(timeout);
        long timeoutSeconds = timeout.getSeconds();
        this.poolSize = maxGeneratorCount;
        try {
            logger.debug("Acquiring lock.");
            LeaseGrantResponse lease = etcd.getLeaseClient().grant(timeoutSeconds + 5L).get(timeoutSeconds, TimeUnit.SECONDS);
            leaseId = lease.getID();
            logger.debug("Got lease {}.", (Object)leaseId);
            try {
                lock = etcd.getLockClient().lock(LOCK_NAME, leaseId).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                throw new IOException("Process timed out.");
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired lock: {}.", (Object)lock.getKey().toString(StandardCharsets.UTF_8));
            }
            etcd.getLeaseClient().keepAliveOnce(leaseId).get(timeoutSeconds, TimeUnit.SECONDS);
            logger.debug("Lease renewed.");
            ResourcePair resourcePair = this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter, waitWhenNoResourcesAvailable);
            this.clusterId = resourcePair.clusterId;
            this.generatorId = resourcePair.generatorId;
        }
        catch (ExecutionException | TimeoutException e) {
            throw new IOException(e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
        try {
            etcd.getLockClient().unlock(lock.getKey()).get(timeoutSeconds, TimeUnit.SECONDS);
            if (logger.isDebugEnabled()) {
                logger.debug("Released lock: {}.", (Object)lock.getKey().toString(StandardCharsets.UTF_8));
            }
            etcd.getLeaseClient().revoke(leaseId).get(timeoutSeconds, TimeUnit.SECONDS);
        }
        catch (ExecutionException | TimeoutException e) {
            logger.warn("Failed to release lock {} (will be released automatically by Etcd server). Resource-claims was successfully acquired though.", (Object)lock.getKey().toString(StandardCharsets.UTF_8));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        logger.debug("Resource-claim acquired ({}/{}).", (Object)this.clusterId, (Object)this.generatorId);
    }

    public static RegistryBasedResourceClaim claim(Supplier<Client> connectToEtcd, int maxGeneratorCount, String registryEntry, Duration acquisitionTimeout, boolean waitWhenNoResourcesAvailable) throws IOException {
        return new RegistryBasedResourceClaim(connectToEtcd, maxGeneratorCount, registryEntry, acquisitionTimeout, waitWhenNoResourcesAvailable);
    }

    ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clusterIds, Instant giveUpAfter, boolean waitWhenNoResourcesAvailable) throws InterruptedException, IOException, ExecutionException {
        logger.debug("Trying to claim a resource.");
        int registrySize = maxGeneratorCount * clusterIds.size();
        GetOption getOptions = GetOption.builder().withKeysOnly(true).withRange(OptionsUtil.prefixEndOf(REGISTRY_KEY)).build();
        GetResponse get = etcd.getKVClient().get(REGISTRY_KEY, getOptions).get();
        List claimedResources = get.getKvs().stream().map(KeyValue::getKey).collect(Collectors.toList());
        if (claimedResources.size() >= registrySize) {
            if (!waitWhenNoResourcesAvailable) {
                throw new IOException("No resources available. Giving up as requested. Registry size: " + registrySize + ".");
            }
            logger.warn("No resources available at the moment (registry size: {}), waiting.", (Object)registrySize);
            CountDownLatch latch = new CountDownLatch(1);
            Watch.Watcher watcher = etcd.getWatchClient().watch(REGISTRY_KEY, WatchOption.builder().withRange(OptionsUtil.prefixEndOf(REGISTRY_KEY)).build(), watchResponse -> latch.countDown());
            this.awaitLatchUnlessItTakesTooLong(latch, giveUpAfter);
            watcher.close();
            return this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter, true);
        }
        for (Integer clusterId : clusterIds) {
            for (int generatorId = 0; generatorId < maxGeneratorCount; ++generatorId) {
                String resourcePathString = RegistryBasedResourceClaim.resourceKey(clusterId, generatorId);
                ByteSequence resourcePath = ByteSequence.from(resourcePathString, StandardCharsets.UTF_8);
                if (claimedResources.contains(resourcePath)) continue;
                logger.debug("Trying to claim seemingly available resource {}.", (Object)resourcePathString);
                TxnResponse txnResponse = etcd.getKVClient().txn().If(new Cmp(resourcePath, Cmp.Op.EQUAL, CmpTarget.version(0L))).Then(Op.put(resourcePath, ByteSequence.from(this.registryEntry, StandardCharsets.UTF_8), PutOption.builder().build())).commit().get();
                if (!txnResponse.isSucceeded()) continue;
                logger.info("Successfully claimed resource {}.", (Object)resourcePathString);
                return new ResourcePair(clusterId, generatorId);
            }
        }
        return this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter, waitWhenNoResourcesAvailable);
    }

    static String resourceKey(Integer clusterId, int generatorId) {
        return REGISTRY_PREFIX + clusterId + ":" + generatorId;
    }

    private void awaitLatchUnlessItTakesTooLong(CountDownLatch latch, Instant giveUpAfter) throws IOException, InterruptedException {
        if (giveUpAfter == null) {
            latch.await();
        } else {
            Instant now = Instant.now();
            if (!giveUpAfter.isAfter(now)) {
                throw new IOException("Process timed out.");
            }
            boolean success = latch.await(Duration.between(now, giveUpAfter).toMillis(), TimeUnit.MILLISECONDS);
            if (!success) {
                this.close();
                throw new IOException("Process timed out.");
            }
        }
    }

    private void relinquishResource() {
        logger.debug("Relinquishing claimed registry resource {}:{}.", (Object)this.clusterId, (Object)this.generatorId);
        String resourcePathString = RegistryBasedResourceClaim.resourceKey(this.clusterId, this.generatorId);
        ByteSequence resourcePath = ByteSequence.from(resourcePathString, StandardCharsets.UTF_8);
        try {
            this.kvClient.delete(resourcePath).get(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException | TimeoutException e) {
            logger.error("Failed to revoke Etcd lease.", e);
        }
    }

    public int getClusterId() {
        return this.clusterId;
    }

    public int getGeneratorId() {
        return this.generatorId;
    }

    public void close() {
        this.relinquishResource();
    }

    public String getRegistryEntry() {
        return this.registryEntry;
    }
}

