/*
 * Decompiled with CFR 0.152.
 */
package org.lable.oss.uniqueid.etcd;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.CloseableClient;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.lock.LockResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.lable.oss.uniqueid.etcd.EtcdHelper;
import org.lable.oss.uniqueid.etcd.ResourcePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceClaim
implements Closeable {
    static final String POOL_PREFIX = "pool/";
    static final ByteSequence POOL_KEY = ByteSequence.from((String)"pool/", (Charset)StandardCharsets.UTF_8);
    static final ByteSequence LOCK_NAME = ByteSequence.from((String)"unique-id-resource-lock", (Charset)StandardCharsets.UTF_8);
    static final Logger logger = LoggerFactory.getLogger(ResourceClaim.class);
    final int clusterId;
    final int generatorId;
    final int poolSize;
    final Client etcd;
    long leaseId;
    protected State state;
    protected List<Closeable> closeables = new ArrayList<Closeable>();

    ResourceClaim(Client etcd, int maxGeneratorCount, List<Integer> clusterIds, Duration timeout) throws IOException {
        this.state = State.INITIALIZING;
        logger.debug("Acquiring resource-claim\u2026");
        timeout = timeout == null ? Duration.ofMinutes(5L) : timeout;
        Instant giveUpAfter = Instant.now().plus(timeout);
        this.poolSize = maxGeneratorCount;
        this.etcd = etcd;
        try {
            LockResponse lock;
            LeaseGrantResponse lease = (LeaseGrantResponse)etcd.getLeaseClient().grant(5L).get();
            this.leaseId = lease.getID();
            CloseableClient leaseKeepAlive = EtcdHelper.keepLeaseAlive(etcd, this.leaseId, this::close);
            this.closeables.add(() -> ((CloseableClient)leaseKeepAlive).close());
            try {
                lock = (LockResponse)etcd.getLockClient().lock(LOCK_NAME, this.leaseId).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                this.close();
                throw new IOException("Process timed out.");
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired lock: {}.", (Object)lock.getKey().toString(StandardCharsets.UTF_8));
            }
            ResourcePair resourcePair = this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter);
            this.clusterId = resourcePair.clusterId;
            this.generatorId = resourcePair.generatorId;
            etcd.getLockClient().unlock(lock.getKey()).get();
        }
        catch (ExecutionException e) {
            this.close();
            throw new IOException(e);
        }
        catch (InterruptedException e) {
            this.close();
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
        this.state = State.HAS_CLAIM;
        logger.debug("Resource-claim acquired ({}/{}).", (Object)this.clusterId, (Object)this.generatorId);
    }

    public static ResourceClaim claim(Client etcd, int maxGeneratorCount, List<Integer> clusterIds) throws IOException {
        return new ResourceClaim(etcd, maxGeneratorCount, clusterIds, Duration.ofMinutes(10L));
    }

    public static ResourceClaim claim(Client etcd, int maxGeneratorCount, List<Integer> clusterIds, Duration timeout) throws IOException {
        return new ResourceClaim(etcd, maxGeneratorCount, clusterIds, timeout);
    }

    public int getClusterId() {
        if (this.state != State.HAS_CLAIM) {
            throw new IllegalStateException("Resource claim not held.");
        }
        return this.clusterId;
    }

    public int getGeneratorId() {
        if (this.state != State.HAS_CLAIM) {
            throw new IllegalStateException("Resource claim not held.");
        }
        return this.generatorId;
    }

    @Override
    public void close() {
        this.close(false);
    }

    public void close(boolean nodeAlreadyDeleted) {
        if (this.state == State.CLAIM_RELINQUISHED) {
            return;
        }
        logger.debug("Closing resource-claim ({}).", (Object)ResourceClaim.resourceKey(this.clusterId, this.generatorId));
        if (nodeAlreadyDeleted) {
            this.state = State.CLAIM_RELINQUISHED;
            return;
        }
        if (this.state == State.HAS_CLAIM) {
            this.state = State.CLAIM_RELINQUISHED;
            new Timer().schedule(new TimerTask(){

                @Override
                public void run() {
                    ResourceClaim.this.relinquishResource();
                }
            }, TimeUnit.SECONDS.toMillis(2L));
        } else {
            this.state = State.CLAIM_RELINQUISHED;
        }
        for (Closeable closeable : this.closeables) {
            try {
                closeable.close();
            }
            catch (IOException e) {
                logger.warn("Failed to close resource properly.", (Throwable)e);
            }
        }
    }

    ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clusterIds, Instant giveUpAfter) throws InterruptedException, IOException, ExecutionException {
        logger.debug("Trying to claim a resource.");
        int poolSize = maxGeneratorCount * clusterIds.size();
        GetOption getOptions = GetOption.newBuilder().withKeysOnly(true).withPrefix(POOL_KEY).build();
        GetResponse get = (GetResponse)etcd.getKVClient().get(POOL_KEY, getOptions).get();
        List claimedResources = get.getKvs().stream().map(KeyValue::getKey).collect(Collectors.toList());
        if (claimedResources.size() >= poolSize) {
            logger.debug("No resources available at the moment (pool size: {}), waiting.", (Object)poolSize);
            CountDownLatch latch = new CountDownLatch(1);
            Watch.Watcher watcher = etcd.getWatchClient().watch(POOL_KEY, WatchOption.newBuilder().withPrefix(POOL_KEY).build(), watchResponse -> latch.countDown());
            this.awaitLatchUnlessItTakesTooLong(latch, giveUpAfter);
            watcher.close();
            return this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter);
        }
        for (Integer clusterId : clusterIds) {
            for (int generatorId = 0; generatorId < maxGeneratorCount; ++generatorId) {
                String resourcePathString = ResourceClaim.resourceKey(clusterId, generatorId);
                ByteSequence resourcePath = ByteSequence.from((String)resourcePathString, (Charset)StandardCharsets.UTF_8);
                if (claimedResources.contains(resourcePath)) continue;
                logger.debug("Trying to claim seemingly available resource {}.", (Object)resourcePathString);
                TxnResponse txnResponse = (TxnResponse)etcd.getKVClient().txn().If(new Cmp[]{new Cmp(resourcePath, Cmp.Op.EQUAL, (CmpTarget)CmpTarget.version((long)0L))}).Then(new Op[]{Op.put((ByteSequence)resourcePath, (ByteSequence)ByteSequence.EMPTY, (PutOption)PutOption.newBuilder().withLeaseId(this.leaseId).build())}).commit().get();
                if (!txnResponse.isSucceeded()) continue;
                this.closeables.add((Closeable)etcd.getWatchClient().watch(resourcePath, watchResponse -> {
                    for (WatchEvent event : watchResponse.getEvents()) {
                        if (event.getEventType() != WatchEvent.EventType.DELETE) continue;
                        logger.debug("Resource-claim node unexpectedly deleted ({})", (Object)resourcePathString);
                        this.close(true);
                    }
                }));
                logger.debug("Successfully claimed resource {}.", (Object)resourcePathString);
                return new ResourcePair(clusterId, generatorId);
            }
        }
        return this.claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter);
    }

    private void relinquishResource() {
        logger.debug("Relinquishing claimed resource {}:{}.", (Object)this.clusterId, (Object)this.generatorId);
        try {
            this.etcd.getLeaseClient().revoke(this.leaseId).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            logger.error("Failed to revoke Etcd lease.", (Throwable)e);
        }
    }

    static String resourceKey(Integer clusterId, int generatorId) {
        return POOL_PREFIX + clusterId + ":" + generatorId;
    }

    private void awaitLatchUnlessItTakesTooLong(CountDownLatch latch, Instant giveUpAfter) throws IOException, InterruptedException {
        if (giveUpAfter == null) {
            latch.await();
        } else {
            Instant now = Instant.now();
            if (!giveUpAfter.isAfter(now)) {
                throw new IOException("Process timed out.");
            }
            boolean success = latch.await(Duration.between(now, giveUpAfter).toMillis(), TimeUnit.MILLISECONDS);
            if (!success) {
                this.close();
                throw new IOException("Process timed out.");
            }
        }
    }

    public static enum State {
        INITIALIZING,
        HAS_CLAIM,
        CLAIM_RELINQUISHED;

    }
}

