/*
 * Decompiled with CFR 0.152.
 */
package org.lable.oss.uniqueid.etcd;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.lock.LockResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.lable.oss.uniqueid.etcd.ClusterID;
import org.lable.oss.uniqueid.etcd.ResourcePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RegistryBasedResourceClaim {
    private static final Logger logger = LoggerFactory.getLogger(RegistryBasedResourceClaim.class);
    static final String REGISTRY_PREFIX = "registry/";
    static final ByteSequence REGISTRY_KEY = ByteSequence.from((String)"registry/", (Charset)StandardCharsets.UTF_8);
    static final ByteSequence LOCK_NAME = ByteSequence.from((String)"unique-id-registry-lock", (Charset)StandardCharsets.UTF_8);
    final Supplier<Client> connectToEtcd;
    final String registryEntry;
    final int clusterId;
    final int generatorId;
    final int poolSize;

    RegistryBasedResourceClaim(Supplier<Client> connectToEtcd, int maxGeneratorCount, String registryEntry, Duration acquisitionTimeout, boolean waitWhenNoResourcesAvailable) throws IOException {
        this.registryEntry = registryEntry;
        this.connectToEtcd = connectToEtcd;
        logger.info("Acquiring resource-claim\u2026");
        Client etcd = connectToEtcd.get();
        List<Integer> clusterIds = ClusterID.get(etcd);
        Duration timeout = acquisitionTimeout == null ? Duration.ofMinutes(5L) : acquisitionTimeout;
        Instant giveUpAfter = Instant.now().plus(timeout);
        this.poolSize = maxGeneratorCount;
        ResourcePair resourcePair = null;
        try {
            LockResponse lock;
            logger.debug("Acquiring lock, timeout is set to {}.", (Object)timeout);
            LeaseGrantResponse lease = (LeaseGrantResponse)etcd.getLeaseClient().grant(timeout.plusSeconds(5L).getSeconds()).get();
            long leaseId = lease.getID();
            try {
                lock = (LockResponse)etcd.getLockClient().lock(LOCK_NAME, leaseId).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                throw new IOException("Process timed out.");
            }
            etcd.getLeaseClient().keepAliveOnce(leaseId).get();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired lock: {}.", (Object)lock.getKey().toString(StandardCharsets.UTF_8));
            }
            resourcePair = this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter, waitWhenNoResourcesAvailable);
            this.clusterId = resourcePair.clusterId;
            this.generatorId = resourcePair.generatorId;
            etcd.getLockClient().unlock(lock.getKey()).get();
            if (logger.isDebugEnabled()) {
                logger.debug("Released lock: {}.", (Object)lock.getKey().toString(StandardCharsets.UTF_8));
            }
            etcd.getLeaseClient().revoke(leaseId).get();
        }
        catch (ExecutionException e) {
            if (resourcePair != null) {
                this.close();
            }
            throw new IOException(e);
        }
        catch (InterruptedException e) {
            if (resourcePair != null) {
                this.close();
            }
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
        logger.debug("Resource-claim acquired ({}/{}).", (Object)this.clusterId, (Object)this.generatorId);
    }

    public static RegistryBasedResourceClaim claim(Supplier<Client> connectToEtcd, int maxGeneratorCount, String registryEntry, Duration acquisitionTimeout, boolean waitWhenNoResourcesAvailable) throws IOException {
        return new RegistryBasedResourceClaim(connectToEtcd, maxGeneratorCount, registryEntry, acquisitionTimeout, waitWhenNoResourcesAvailable);
    }

    ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clusterIds, Instant giveUpAfter, boolean waitWhenNoResourcesAvailable) throws InterruptedException, IOException, ExecutionException {
        logger.debug("Trying to claim a resource.");
        int registrySize = maxGeneratorCount * clusterIds.size();
        GetOption getOptions = GetOption.newBuilder().withKeysOnly(true).withPrefix(REGISTRY_KEY).build();
        GetResponse get = (GetResponse)etcd.getKVClient().get(REGISTRY_KEY, getOptions).get();
        List claimedResources = get.getKvs().stream().map(KeyValue::getKey).collect(Collectors.toList());
        if (claimedResources.size() >= registrySize) {
            if (!waitWhenNoResourcesAvailable) {
                throw new IOException("No resources available. Giving up as requested. Registry size: " + registrySize + ".");
            }
            logger.warn("No resources available at the moment (registry size: {}), waiting.", (Object)registrySize);
            CountDownLatch latch = new CountDownLatch(1);
            Watch.Watcher watcher = etcd.getWatchClient().watch(REGISTRY_KEY, WatchOption.newBuilder().withPrefix(REGISTRY_KEY).build(), watchResponse -> latch.countDown());
            this.awaitLatchUnlessItTakesTooLong(latch, giveUpAfter);
            watcher.close();
            return this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter, true);
        }
        for (Integer clusterId : clusterIds) {
            for (int generatorId = 0; generatorId < maxGeneratorCount; ++generatorId) {
                String resourcePathString = RegistryBasedResourceClaim.resourceKey(clusterId, generatorId);
                ByteSequence resourcePath = ByteSequence.from((String)resourcePathString, (Charset)StandardCharsets.UTF_8);
                if (claimedResources.contains(resourcePath)) continue;
                logger.debug("Trying to claim seemingly available resource {}.", (Object)resourcePathString);
                TxnResponse txnResponse = (TxnResponse)etcd.getKVClient().txn().If(new Cmp[]{new Cmp(resourcePath, Cmp.Op.EQUAL, (CmpTarget)CmpTarget.version((long)0L))}).Then(new Op[]{Op.put((ByteSequence)resourcePath, (ByteSequence)ByteSequence.from((String)this.registryEntry, (Charset)StandardCharsets.UTF_8), (PutOption)PutOption.newBuilder().build())}).commit().get();
                if (!txnResponse.isSucceeded()) continue;
                logger.info("Successfully claimed resource {}.", (Object)resourcePathString);
                return new ResourcePair(clusterId, generatorId);
            }
        }
        return this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter, waitWhenNoResourcesAvailable);
    }

    static String resourceKey(Integer clusterId, int generatorId) {
        return REGISTRY_PREFIX + clusterId + ":" + generatorId;
    }

    private void awaitLatchUnlessItTakesTooLong(CountDownLatch latch, Instant giveUpAfter) throws IOException, InterruptedException {
        if (giveUpAfter == null) {
            latch.await();
        } else {
            Instant now = Instant.now();
            if (!giveUpAfter.isAfter(now)) {
                throw new IOException("Process timed out.");
            }
            boolean success = latch.await(Duration.between(now, giveUpAfter).toMillis(), TimeUnit.MILLISECONDS);
            if (!success) {
                this.close();
                throw new IOException("Process timed out.");
            }
        }
    }

    private void relinquishResource() {
        logger.debug("Relinquishing claimed registry resource {}:{}.", (Object)this.clusterId, (Object)this.generatorId);
        Client etcd = this.connectToEtcd.get();
        String resourcePathString = RegistryBasedResourceClaim.resourceKey(this.clusterId, this.generatorId);
        ByteSequence resourcePath = ByteSequence.from((String)resourcePathString, (Charset)StandardCharsets.UTF_8);
        try {
            etcd.getKVClient().delete(resourcePath).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            logger.error("Failed to revoke Etcd lease.", (Throwable)e);
        }
    }

    public int getClusterId() {
        return this.clusterId;
    }

    public int getGeneratorId() {
        return this.generatorId;
    }

    public void close() {
        this.relinquishResource();
    }

    public String getRegistryEntry() {
        return this.registryEntry;
    }
}

