/*
 * Decompiled with CFR 0.152.
 */
package org.lable.oss.uniqueid.etcd;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.CloseableClient;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.lock.LockResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.lable.oss.uniqueid.etcd.ClusterID;
import org.lable.oss.uniqueid.etcd.EtcdHelper;
import org.lable.oss.uniqueid.etcd.ResourcePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RegistryBasedResourceClaim {
    private static final Logger logger = LoggerFactory.getLogger(RegistryBasedResourceClaim.class);
    static final String REGISTRY_PREFIX = "registry/";
    static final ByteSequence REGISTRY_KEY = ByteSequence.from("registry/", StandardCharsets.UTF_8);
    static final ByteSequence LOCK_NAME = ByteSequence.from("unique-id-registry-lock", StandardCharsets.UTF_8);
    final Supplier<Client> connectToEtcd;
    final String registryEntry;
    final int clusterId;
    final int generatorId;
    final int poolSize;

    RegistryBasedResourceClaim(Supplier<Client> connectToEtcd, int maxGeneratorCount, String registryEntry) throws IOException {
        this.registryEntry = registryEntry;
        this.connectToEtcd = connectToEtcd;
        logger.debug("Acquiring resource-claim\u2026");
        Client etcd = connectToEtcd.get();
        List<Integer> clusterIds = ClusterID.get(etcd);
        Duration timeout = Duration.ofMinutes(5L);
        Instant giveUpAfter = Instant.now().plus(timeout);
        this.poolSize = maxGeneratorCount;
        try {
            LockResponse lock;
            LeaseGrantResponse lease = etcd.getLeaseClient().grant(5L).get();
            long leaseId = lease.getID();
            CloseableClient leaseKeepAlive = EtcdHelper.keepLeaseAlive(etcd, leaseId, null);
            try {
                lock = etcd.getLockClient().lock(LOCK_NAME, leaseId).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                throw new IOException("Process timed out.");
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired lock: {}.", (Object)lock.getKey().toString(StandardCharsets.UTF_8));
            }
            ResourcePair resourcePair = this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter);
            this.clusterId = resourcePair.clusterId;
            this.generatorId = resourcePair.generatorId;
            etcd.getLockClient().unlock(lock.getKey()).get();
            leaseKeepAlive.close();
        }
        catch (ExecutionException e) {
            this.close();
            throw new IOException(e);
        }
        catch (InterruptedException e) {
            this.close();
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
        logger.debug("Resource-claim acquired ({}/{}).", (Object)this.clusterId, (Object)this.generatorId);
    }

    public static RegistryBasedResourceClaim claim(Supplier<Client> connectToEtcd, int maxGeneratorCount, String registryEntry) throws IOException {
        return new RegistryBasedResourceClaim(connectToEtcd, maxGeneratorCount, registryEntry);
    }

    ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clusterIds, Instant giveUpAfter) throws InterruptedException, IOException, ExecutionException {
        logger.debug("Trying to claim a resource.");
        int registrySize = maxGeneratorCount * clusterIds.size();
        GetOption getOptions = GetOption.newBuilder().withKeysOnly(true).withPrefix(REGISTRY_KEY).build();
        GetResponse get = etcd.getKVClient().get(REGISTRY_KEY, getOptions).get();
        List claimedResources = get.getKvs().stream().map(KeyValue::getKey).collect(Collectors.toList());
        if (claimedResources.size() >= registrySize) {
            logger.debug("No resources available at the moment (registry size: {}), waiting.", (Object)registrySize);
            CountDownLatch latch = new CountDownLatch(1);
            Watch.Watcher watcher = etcd.getWatchClient().watch(REGISTRY_KEY, WatchOption.newBuilder().withPrefix(REGISTRY_KEY).build(), watchResponse -> latch.countDown());
            this.awaitLatchUnlessItTakesTooLong(latch, giveUpAfter);
            watcher.close();
            return this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter);
        }
        for (Integer clusterId : clusterIds) {
            for (int generatorId = 0; generatorId < maxGeneratorCount; ++generatorId) {
                String resourcePathString = RegistryBasedResourceClaim.resourceKey(clusterId, generatorId);
                ByteSequence resourcePath = ByteSequence.from(resourcePathString, StandardCharsets.UTF_8);
                if (claimedResources.contains(resourcePath)) continue;
                logger.debug("Trying to claim seemingly available resource {}.", (Object)resourcePathString);
                TxnResponse txnResponse = etcd.getKVClient().txn().If(new Cmp(resourcePath, Cmp.Op.EQUAL, CmpTarget.version(0L))).Then(Op.put(resourcePath, ByteSequence.from(this.registryEntry, StandardCharsets.UTF_8), PutOption.newBuilder().build())).commit().get();
                if (!txnResponse.isSucceeded()) continue;
                logger.debug("Successfully claimed resource {}.", (Object)resourcePathString);
                return new ResourcePair(clusterId, generatorId);
            }
        }
        return this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter);
    }

    static String resourceKey(Integer clusterId, int generatorId) {
        return REGISTRY_PREFIX + clusterId + ":" + generatorId;
    }

    private void awaitLatchUnlessItTakesTooLong(CountDownLatch latch, Instant giveUpAfter) throws IOException, InterruptedException {
        if (giveUpAfter == null) {
            latch.await();
        } else {
            Instant now = Instant.now();
            if (!giveUpAfter.isAfter(now)) {
                throw new IOException("Process timed out.");
            }
            boolean success = latch.await(Duration.between(now, giveUpAfter).toMillis(), TimeUnit.MILLISECONDS);
            if (!success) {
                this.close();
                throw new IOException("Process timed out.");
            }
        }
    }

    private void relinquishResource() {
        logger.debug("Relinquishing claimed registry resource {}:{}.", (Object)this.clusterId, (Object)this.generatorId);
        Client etcd = this.connectToEtcd.get();
        String resourcePathString = RegistryBasedResourceClaim.resourceKey(this.clusterId, this.generatorId);
        ByteSequence resourcePath = ByteSequence.from(resourcePathString, StandardCharsets.UTF_8);
        try {
            etcd.getKVClient().delete(resourcePath).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            logger.error("Failed to revoke Etcd lease.", e);
        }
    }

    public int getClusterId() {
        return this.clusterId;
    }

    public int getGeneratorId() {
        return this.generatorId;
    }

    public void close() {
        this.relinquishResource();
    }

    public String getRegistryEntry() {
        return this.registryEntry;
    }
}

