/*
 * Decompiled with CFR 0.152.
 */
package alluxio.membership;

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.membership.AlluxioEtcdClient;
import alluxio.membership.MembershipManager;
import alluxio.membership.WorkerClusterView;
import alluxio.membership.WorkerServiceEntity;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.gson.JsonParseException;
import alluxio.shaded.client.io.etcd.jetcd.ByteSequence;
import alluxio.shaded.client.io.etcd.jetcd.KeyValue;
import alluxio.shaded.client.io.etcd.jetcd.Txn;
import alluxio.shaded.client.io.etcd.jetcd.kv.TxnResponse;
import alluxio.shaded.client.io.etcd.jetcd.op.Cmp;
import alluxio.shaded.client.io.etcd.jetcd.op.CmpTarget;
import alluxio.shaded.client.io.etcd.jetcd.op.Op;
import alluxio.shaded.client.io.etcd.jetcd.options.GetOption;
import alluxio.shaded.client.io.etcd.jetcd.options.PutOption;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerState;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EtcdMembershipManager
implements MembershipManager {
    private static final Logger LOG = LoggerFactory.getLogger(EtcdMembershipManager.class);
    private final AlluxioConfiguration mConf;
    private AlluxioEtcdClient mAlluxioEtcdClient;
    private String mClusterName;
    private Supplier<String> mRingPathPrefix = CommonUtils.memoize(this::constructRingPathPrefix);

    public static EtcdMembershipManager create(AlluxioConfiguration conf) {
        return new EtcdMembershipManager(conf);
    }

    public EtcdMembershipManager(AlluxioConfiguration conf) {
        this(conf, AlluxioEtcdClient.getInstance(conf));
    }

    public EtcdMembershipManager(AlluxioConfiguration conf, AlluxioEtcdClient alluxioEtcdClient) {
        this.mConf = conf;
        this.mClusterName = conf.getString(PropertyKey.ALLUXIO_CLUSTER_NAME);
        this.mAlluxioEtcdClient = alluxioEtcdClient;
    }

    private String constructRingPathPrefix() {
        return String.format("/DHT/%s/AUTHORIZED/", this.mClusterName);
    }

    private String getRingPathPrefix() {
        return this.mRingPathPrefix.get();
    }

    @Override
    public void join(WorkerInfo workerInfo) throws IOException {
        WorkerServiceEntity entity;
        block4: {
            LOG.info("Try joining on etcd for worker:{} ", (Object)workerInfo);
            entity = new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress());
            String pathOnRing = new StringBuffer().append(this.getRingPathPrefix()).append(entity.getServiceEntityName()).toString();
            byte[] serializedEntity = entity.serialize();
            try {
                boolean isK8s = this.mConf.isSet(PropertyKey.K8S_ENV_DEPLOYMENT) && this.mConf.getBoolean(PropertyKey.K8S_ENV_DEPLOYMENT);
                Txn txn = this.mAlluxioEtcdClient.getEtcdClient().getKVClient().txn();
                ByteSequence keyToPut = ByteSequence.from(pathOnRing, StandardCharsets.UTF_8);
                ByteSequence valToPut = ByteSequence.from(serializedEntity);
                CompletableFuture<TxnResponse> txnResponseFut = txn.If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.version(0L))).Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().build())).Else(isK8s ? Op.put(keyToPut, valToPut, PutOption.newBuilder().build()) : Op.get(keyToPut, GetOption.DEFAULT)).commit();
                TxnResponse txnResponse = txnResponseFut.get();
                if (isK8s || txnResponse.isSucceeded()) break block4;
                ArrayList kvs = new ArrayList();
                txnResponse.getGetResponses().stream().map(r -> kvs.addAll(r.getKvs())).collect(Collectors.toList());
                Optional latestKV = kvs.stream().max((kv1, kv2) -> (int)(kv1.getModRevision() - kv2.getModRevision()));
                if (!latestKV.isPresent() || Arrays.equals(((KeyValue)latestKV.get()).getValue().getBytes(), serializedEntity)) break block4;
                Optional<WorkerServiceEntity> existingEntity = this.parseWorkerServiceEntity((KeyValue)latestKV.get());
                if (!existingEntity.isPresent()) {
                    throw new IOException(String.format("Existing WorkerServiceEntity for path:%s corrupted", pathOnRing));
                }
                if (existingEntity.get().equalsIgnoringOptionalFields(entity)) {
                    this.mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity));
                    break block4;
                }
                throw new AlreadyExistsException(String.format("Some other member with same id registered on the ring, bail.Conflicting worker addr:%s, worker identity:%s.Different workers can't assume same worker identity in non-k8s env,clean local worker identity settings to continue.", existingEntity.get().getWorkerNetAddress().toString(), existingEntity.get().getIdentity()));
            }
            catch (InterruptedException | ExecutionException e) {
                throw new IOException(e);
            }
        }
        this.mAlluxioEtcdClient.mServiceDiscovery.registerAndStartSync(entity);
        LOG.info("Joined on etcd for worker:{} ", (Object)workerInfo);
    }

    @Override
    public WorkerClusterView getAllMembers() throws IOException {
        Set liveWorkerIds = this.parseWorkersFromEtcdKvPairs(this.mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices()).map(WorkerServiceEntity::getIdentity).collect(Collectors.toSet());
        Predicate<WorkerInfo> isLive = w -> liveWorkerIds.contains(w.getIdentity());
        Iterable<WorkerInfo> workerInfoIterable = this.parseWorkersFromEtcdKvPairs(this.mAlluxioEtcdClient.getChildren(this.getRingPathPrefix())).map(w -> new WorkerInfo().setIdentity(w.getIdentity()).setAddress(w.getWorkerNetAddress())).map(w -> w.setState(isLive.test((WorkerInfo)w) ? WorkerState.LIVE : WorkerState.LOST))::iterator;
        return new WorkerClusterView(workerInfoIterable);
    }

    @Override
    public WorkerClusterView getLiveMembers() throws IOException {
        Iterable<WorkerInfo> workerInfoIterable = this.parseWorkersFromEtcdKvPairs(this.mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices()).map(w -> new WorkerInfo().setIdentity(w.getIdentity()).setAddress(w.getWorkerNetAddress()).setState(WorkerState.LIVE))::iterator;
        return new WorkerClusterView(workerInfoIterable);
    }

    @Override
    public WorkerClusterView getFailedMembers() throws IOException {
        Set liveWorkerIds = this.parseWorkersFromEtcdKvPairs(this.mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices()).map(WorkerServiceEntity::getIdentity).collect(Collectors.toSet());
        Iterable<WorkerInfo> failedWorkerIterable = this.parseWorkersFromEtcdKvPairs(this.mAlluxioEtcdClient.getChildren(this.getRingPathPrefix())).filter(w -> !liveWorkerIds.contains(w.getIdentity())).map(w -> new WorkerInfo().setIdentity(w.getIdentity()).setAddress(w.getWorkerNetAddress()).setState(WorkerState.LOST))::iterator;
        return new WorkerClusterView(failedWorkerIterable);
    }

    private Stream<WorkerServiceEntity> parseWorkersFromEtcdKvPairs(List<KeyValue> workerKvs) {
        return workerKvs.stream().map(this::parseWorkerServiceEntity).filter(Optional::isPresent).map(Optional::get);
    }

    private Optional<WorkerServiceEntity> parseWorkerServiceEntity(KeyValue etcdKvPair) {
        try {
            WorkerServiceEntity entity = new WorkerServiceEntity();
            entity.deserialize(etcdKvPair.getValue().getBytes());
            return Optional.of(entity);
        }
        catch (JsonParseException ex) {
            return Optional.empty();
        }
    }

    @Override
    @VisibleForTesting
    public String showAllMembers() {
        try {
            WorkerClusterView registeredWorkers = this.getAllMembers();
            WorkerClusterView liveWorkers = this.getLiveMembers();
            String printFormat = "%s\t%s\t%s%n";
            StringBuilder sb = new StringBuilder(String.format(printFormat, "WorkerId", "Address", "Status"));
            for (WorkerInfo entity : registeredWorkers) {
                String entryLine = String.format(printFormat, entity.getIdentity(), entity.getAddress().getHost() + ":" + entity.getAddress().getRpcPort(), liveWorkers.getWorkerById(entity.getIdentity()).isPresent() ? "ONLINE" : "OFFLINE");
                sb.append(entryLine);
            }
            return sb.toString();
        }
        catch (IOException ex) {
            return String.format("Exception happened:%s", ex.getMessage());
        }
    }

    @Override
    public void stopHeartBeat(WorkerInfo worker) throws IOException {
        WorkerServiceEntity entity = new WorkerServiceEntity(worker.getIdentity(), worker.getAddress());
        this.mAlluxioEtcdClient.mServiceDiscovery.unregisterService(entity.getServiceEntityName());
    }

    @Override
    public void decommission(WorkerInfo worker) throws IOException {
        Optional<WorkerInfo> targetWorker = this.getAllMembers().getWorkerById(worker.getIdentity());
        if (!targetWorker.isPresent()) {
            throw new InvalidArgumentException(String.format("Unrecognized or non-existing worker: %s", worker.getIdentity()));
        }
        if (targetWorker.get().getState() != WorkerState.LOST) {
            throw new InvalidArgumentException(String.format("Can't remove running worker: %s, stop the worker before removing", worker.getIdentity()));
        }
        this.stopHeartBeat(worker);
        String pathOnRing = new StringBuffer().append(this.getRingPathPrefix()).append(worker.getIdentity()).toString();
        this.mAlluxioEtcdClient.deleteForPath(pathOnRing, false);
        LOG.info("Successfully removed worker:{}", (Object)worker.getIdentity());
    }

    @Override
    public void close() throws Exception {
    }
}

