/*
 * Decompiled with CFR 0.152.
 */
package alluxio.membership;

import alluxio.exception.runtime.UnavailableRuntimeException;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.exception.status.NotFoundException;
import alluxio.membership.AlluxioEtcdClient;
import alluxio.membership.DefaultServiceEntity;
import alluxio.resource.LockResource;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.io.etcd.jetcd.ByteSequence;
import alluxio.shaded.client.io.etcd.jetcd.KeyValue;
import alluxio.shaded.client.io.etcd.jetcd.Txn;
import alluxio.shaded.client.io.etcd.jetcd.kv.TxnResponse;
import alluxio.shaded.client.io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import alluxio.shaded.client.io.etcd.jetcd.op.Cmp;
import alluxio.shaded.client.io.etcd.jetcd.op.CmpTarget;
import alluxio.shaded.client.io.etcd.jetcd.op.Op;
import alluxio.shaded.client.io.etcd.jetcd.options.GetOption;
import alluxio.shaded.client.io.etcd.jetcd.options.PutOption;
import alluxio.shaded.client.io.etcd.jetcd.support.CloseableClient;
import alluxio.shaded.client.io.grpc.stub.StreamObserver;
import alluxio.util.ThreadFactoryUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceDiscoveryRecipe
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryRecipe.class);
    final AlluxioEtcdClient mAlluxioEtcdClient;
    private final ScheduledExecutorService mExecutor;
    private final String mRegisterPathPrefix;
    private final ConcurrentHashMap<String, DefaultServiceEntity> mRegisteredServices = new ConcurrentHashMap();

    public ServiceDiscoveryRecipe(AlluxioEtcdClient client, String pathPrefix) {
        this.mAlluxioEtcdClient = client;
        this.mRegisterPathPrefix = pathPrefix;
        this.mExecutor = Executors.newSingleThreadScheduledExecutor(ThreadFactoryUtils.build("service-discovery-checker", true));
        this.mExecutor.scheduleWithFixedDelay(this::checkAllForReconnect, 5L, 5L, TimeUnit.SECONDS);
    }

    private void newLeaseInternal(DefaultServiceEntity service, boolean forceOverwriteOldLease) throws IOException {
        try (LockResource lockResource = new LockResource(service.getLock());){
            if (!forceOverwriteOldLease && service.getLease() != null && !this.mAlluxioEtcdClient.isLeaseExpired(service.getLease())) {
                LOG.info("Lease attached with service:{} is not expired, bail from here.", (Object)service.getServiceEntityName());
                return;
            }
            String path = service.getServiceEntityName();
            String fullPath = new StringBuffer().append(this.mRegisterPathPrefix).append("/").append(path).toString();
            try {
                AlluxioEtcdClient.Lease oldLease = service.getLease();
                AlluxioEtcdClient.Lease lease = this.mAlluxioEtcdClient.createLease(service.getLeaseTTLInSec(), service.getLeaseTimeoutInSec(), TimeUnit.SECONDS);
                Txn txn = this.mAlluxioEtcdClient.getEtcdClient().getKVClient().txn();
                ByteSequence keyToPut = ByteSequence.from(fullPath, StandardCharsets.UTF_8);
                ByteSequence valToPut = ByteSequence.from(service.serialize());
                CompletableFuture<TxnResponse> txnResponseFut = txn.If(new Cmp[0]).Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().withLeaseId(lease.mLeaseId).build())).Then(Op.get(keyToPut, GetOption.DEFAULT)).Else(Op.get(keyToPut, GetOption.DEFAULT)).commit();
                TxnResponse txnResponse = txnResponseFut.get();
                ArrayList kvs = new ArrayList();
                txnResponse.getGetResponses().stream().map(r -> kvs.addAll(r.getKvs())).collect(Collectors.toList());
                if (!txnResponse.isSucceeded()) {
                    throw new IOException("Failed to new a lease for service:" + service.toString());
                }
                Preconditions.checkState(!kvs.isEmpty(), "No such service entry found.");
                long latestRevision = kvs.stream().mapToLong(kv -> kv.getModRevision()).max().getAsLong();
                service.setRevisionNumber(latestRevision);
                service.setLease(lease);
                this.startHeartBeat(service);
            }
            catch (InterruptedException | ExecutionException ex) {
                throw new IOException("Exception in new-ing lease for service:" + service, ex);
            }
        }
    }

    public void registerAndStartSync(DefaultServiceEntity service) throws IOException {
        LOG.info("registering service : {}", (Object)service);
        if (this.mRegisteredServices.containsKey(service.getServiceEntityName())) {
            throw new AlreadyExistsException("Service " + service.getServiceEntityName() + " already registered.");
        }
        this.newLeaseInternal(service, false);
        DefaultServiceEntity existEntity = this.mRegisteredServices.putIfAbsent(service.getServiceEntityName(), service);
        if (existEntity != null) {
            DefaultServiceEntity entity = service;
            Throwable throwable = null;
            if (entity != null) {
                if (throwable != null) {
                    try {
                        entity.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    entity.close();
                }
            }
            throw new AlreadyExistsException("Service " + service.getServiceEntityName() + " already registered.");
        }
    }

    public void unregisterService(String serviceIdentifier) throws IOException {
        DefaultServiceEntity entity = this.mRegisteredServices.remove(serviceIdentifier);
        if (entity != null) {
            try (DefaultServiceEntity service = entity;){
                LOG.info("Service unregistered:{}", (Object)service);
            }
        } else {
            LOG.info("Service already unregistered:{}", (Object)serviceIdentifier);
        }
    }

    public void unregisterAll() {
        for (Map.Entry<String, DefaultServiceEntity> entry : this.mRegisteredServices.entrySet()) {
            try {
                this.unregisterService(entry.getKey());
            }
            catch (IOException ex) {
                LOG.error("Unregister all services failed unregistering for:{}.", (Object)entry.getKey(), (Object)ex);
            }
        }
    }

    public ByteBuffer getRegisteredServiceDetail(String DefaultServiceEntityName) {
        String fullPath = new StringBuffer().append(this.mRegisterPathPrefix).append("/").append(DefaultServiceEntityName).toString();
        byte[] val = this.mAlluxioEtcdClient.getForPath(fullPath);
        return ByteBuffer.wrap(val);
    }

    public void updateService(DefaultServiceEntity service) throws IOException {
        LOG.info("Updating service : {}", (Object)service);
        if (!this.mRegisteredServices.containsKey(service.getServiceEntityName())) {
            Preconditions.checkNotNull(service.getLease(), "Service not attach with lease");
            throw new NoSuchElementException("Service " + service.getServiceEntityName() + " not registered, please register first.");
        }
        String fullPath = new StringBuffer().append(this.mRegisterPathPrefix).append("/").append(service.getServiceEntityName()).toString();
        try (LockResource lockResource = new LockResource(service.getLock());){
            long latestRevision;
            Txn txn = this.mAlluxioEtcdClient.getEtcdClient().getKVClient().txn();
            ByteSequence keyToPut = ByteSequence.from(fullPath, StandardCharsets.UTF_8);
            ByteSequence valToPut = ByteSequence.from(service.serialize());
            CompletableFuture<TxnResponse> txnResponseFut = txn.If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.modRevision(service.getRevisionNumber()))).Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().withLeaseId(service.getLease().mLeaseId).build())).Then(Op.get(keyToPut, GetOption.DEFAULT)).Else(Op.get(keyToPut, GetOption.DEFAULT)).commit();
            TxnResponse txnResponse = txnResponseFut.get();
            ArrayList kvs = new ArrayList();
            txnResponse.getGetResponses().stream().map(r -> kvs.addAll(r.getKvs())).collect(Collectors.toList());
            if (!txnResponse.isSucceeded()) {
                if (kvs.isEmpty()) {
                    throw new NotFoundException("Such service kv pair is not in etcd anymore.");
                }
                throw new IOException("Failed to update service:" + service.toString());
            }
            service.mRevision = latestRevision = kvs.stream().mapToLong(kv -> kv.getModRevision()).max().getAsLong();
            if (service.getKeepAliveClient() == null) {
                this.startHeartBeat(service);
            }
            this.mRegisteredServices.put(service.getServiceEntityName(), service);
        }
        catch (ExecutionException ex) {
            throw new IOException("ExecutionException in registering service:" + service, ex);
        }
        catch (InterruptedException ex) {
            LOG.info("InterruptedException caught, bail.");
        }
    }

    private void startHeartBeat(DefaultServiceEntity service) {
        CloseableClient keepAliveClient = this.mAlluxioEtcdClient.getEtcdClient().getLeaseClient().keepAlive(service.getLease().mLeaseId, new RetryKeepAliveObserver(service));
        service.setKeepAliveClient(keepAliveClient);
    }

    public List<KeyValue> getAllLiveServices() {
        return this.mAlluxioEtcdClient.getChildren(this.mRegisterPathPrefix);
    }

    private void checkAllForReconnect() {
        LOG.debug("instance {} - Checking if any service needs reconnection ...", (Object)this);
        for (Map.Entry<String, DefaultServiceEntity> entry : this.mRegisteredServices.entrySet()) {
            DefaultServiceEntity entity = entry.getValue();
            if (!entity.mNeedReconnect.get()) continue;
            try {
                LOG.info("Start reconnect for service:{}", (Object)entity.getServiceEntityName());
                this.newLeaseInternal(entity, true);
                entity.mNeedReconnect.set(false);
            }
            catch (UnavailableRuntimeException | IOException e) {
                LOG.warn("Failed trying to new the lease for service:{}", (Object)entity, (Object)e);
            }
        }
    }

    @Override
    public void close() {
        if (this.mExecutor != null) {
            this.mExecutor.shutdown();
            try {
                this.mExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOG.warn("interrupted while terminating the thread used to poll ETCD");
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    class RetryKeepAliveObserver
    implements StreamObserver<LeaseKeepAliveResponse> {
        public DefaultServiceEntity mService;

        public RetryKeepAliveObserver(DefaultServiceEntity service) {
            this.mService = service;
        }

        @Override
        public void onNext(LeaseKeepAliveResponse value) {
            LOG.debug("onNext keepalive response:id:{}:ttl:{}", (Object)value.getID(), (Object)value.getTTL());
        }

        @Override
        public void onError(Throwable t) {
            LOG.error("onError for Lease for service:{}, leaseId:{}. Setting status to reconnect", new Object[]{this.mService, this.mService.getLease().mLeaseId, t});
            this.mService.mNeedReconnect.compareAndSet(false, true);
        }

        @Override
        public void onCompleted() {
            LOG.warn("onCompleted for Lease for service:{}, leaseId:{}. Setting status to reconnect", (Object)this.mService, (Object)this.mService.getLease().mLeaseId);
            this.mService.mNeedReconnect.compareAndSet(false, true);
        }
    }
}

