/*
 * Decompiled with CFR 0.152.
 */
package alluxio.membership;

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.membership.AlluxioEtcdClient;
import alluxio.membership.MembershipManager;
import alluxio.membership.WorkerServiceEntity;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.io.etcd.jetcd.KeyValue;
import alluxio.shaded.client.org.apache.zookeeper.server.ByteBufferInputStream;
import alluxio.wire.WorkerInfo;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EtcdMembershipManager
implements MembershipManager {
    private static final Logger LOG = LoggerFactory.getLogger(EtcdMembershipManager.class);
    private static final String RING_PATH_FORMAT = "/DHT/%s/AUTHORIZED/";
    private final AlluxioConfiguration mConf;
    private AlluxioEtcdClient mAlluxioEtcdClient;
    private String mClusterName;
    private String mRingPathPrefix = "";

    public static EtcdMembershipManager create(AlluxioConfiguration conf) {
        return new EtcdMembershipManager(conf);
    }

    public EtcdMembershipManager(AlluxioConfiguration conf) {
        this(conf, AlluxioEtcdClient.getInstance(conf));
    }

    public EtcdMembershipManager(AlluxioConfiguration conf, AlluxioEtcdClient alluxioEtcdClient) {
        this.mConf = conf;
        this.mClusterName = conf.getString(PropertyKey.ALLUXIO_CLUSTER_NAME);
        this.mRingPathPrefix = String.format(RING_PATH_FORMAT, this.mClusterName);
        this.mAlluxioEtcdClient = alluxioEtcdClient;
    }

    @Override
    public void join(WorkerInfo wkrAddr) throws IOException {
        WorkerServiceEntity entity = new WorkerServiceEntity(wkrAddr.getAddress());
        String pathOnRing = new StringBuffer().append(this.mRingPathPrefix).append(entity.getServiceEntityName()).toString();
        byte[] ret = this.mAlluxioEtcdClient.getForPath(pathOnRing);
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream dos = new DataOutputStream(baos);){
            entity.serialize(dos);
            byte[] serializedEntity = baos.toByteArray();
            if (ret != null) {
                if (!Arrays.equals(serializedEntity, ret)) {
                    throw new AlreadyExistsException("Some other member with same id registered on the ring, bail.");
                }
            } else {
                this.mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity));
            }
            this.mAlluxioEtcdClient.mServiceDiscovery.registerAndStartSync(entity);
        }
    }

    @Override
    public List<WorkerInfo> getAllMembers() throws IOException {
        List<WorkerServiceEntity> registeredWorkers = this.retrieveFullMembers();
        return registeredWorkers.stream().map(e -> new WorkerInfo().setAddress(e.getWorkerNetAddress())).collect(Collectors.toList());
    }

    private List<WorkerServiceEntity> retrieveFullMembers() throws IOException {
        ArrayList<WorkerServiceEntity> fullMembers = new ArrayList<WorkerServiceEntity>();
        List<KeyValue> childrenKvs = this.mAlluxioEtcdClient.getChildren(this.mRingPathPrefix);
        for (KeyValue kv : childrenKvs) {
            try {
                ByteArrayInputStream bais = new ByteArrayInputStream(kv.getValue().getBytes());
                Throwable throwable = null;
                try {
                    DataInputStream dis = new DataInputStream(bais);
                    Throwable throwable2 = null;
                    try {
                        WorkerServiceEntity entity = new WorkerServiceEntity();
                        entity.deserialize(dis);
                        fullMembers.add(entity);
                    }
                    catch (Throwable throwable3) {
                        throwable2 = throwable3;
                        throw throwable3;
                    }
                    finally {
                        if (dis == null) continue;
                        if (throwable2 != null) {
                            try {
                                dis.close();
                            }
                            catch (Throwable throwable4) {
                                throwable2.addSuppressed(throwable4);
                            }
                            continue;
                        }
                        dis.close();
                    }
                }
                catch (Throwable throwable5) {
                    throwable = throwable5;
                    throw throwable5;
                }
                finally {
                    if (bais == null) continue;
                    if (throwable != null) {
                        try {
                            bais.close();
                        }
                        catch (Throwable throwable6) {
                            throwable.addSuppressed(throwable6);
                        }
                        continue;
                    }
                    bais.close();
                }
            }
            catch (IOException iOException) {}
        }
        return fullMembers;
    }

    private List<WorkerServiceEntity> retrieveLiveMembers() throws IOException {
        ArrayList<WorkerServiceEntity> liveMembers = new ArrayList<WorkerServiceEntity>();
        for (Map.Entry<String, ByteBuffer> entry : this.mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices().entrySet()) {
            try {
                ByteBufferInputStream bbis = new ByteBufferInputStream(entry.getValue());
                Throwable throwable = null;
                try {
                    DataInputStream dis = new DataInputStream(bbis);
                    Throwable throwable2 = null;
                    try {
                        WorkerServiceEntity entity = new WorkerServiceEntity();
                        entity.deserialize(dis);
                        liveMembers.add(entity);
                    }
                    catch (Throwable throwable3) {
                        throwable2 = throwable3;
                        throw throwable3;
                    }
                    finally {
                        if (dis == null) continue;
                        if (throwable2 != null) {
                            try {
                                dis.close();
                            }
                            catch (Throwable throwable4) {
                                throwable2.addSuppressed(throwable4);
                            }
                            continue;
                        }
                        dis.close();
                    }
                }
                catch (Throwable throwable5) {
                    throwable = throwable5;
                    throw throwable5;
                }
                finally {
                    if (bbis == null) continue;
                    if (throwable != null) {
                        try {
                            bbis.close();
                        }
                        catch (Throwable throwable6) {
                            throwable.addSuppressed(throwable6);
                        }
                        continue;
                    }
                    bbis.close();
                }
            }
            catch (IOException iOException) {}
        }
        return liveMembers;
    }

    @Override
    @VisibleForTesting
    public List<WorkerInfo> getLiveMembers() throws IOException {
        List<WorkerServiceEntity> liveWorkers = this.retrieveLiveMembers();
        return liveWorkers.stream().map(e -> new WorkerInfo().setAddress(e.getWorkerNetAddress())).collect(Collectors.toList());
    }

    @Override
    @VisibleForTesting
    public List<WorkerInfo> getFailedMembers() throws IOException {
        List<WorkerServiceEntity> registeredWorkers = this.retrieveFullMembers();
        List liveWorkers = this.retrieveLiveMembers().stream().map(e -> e.getServiceEntityName()).collect(Collectors.toList());
        registeredWorkers.removeIf(e -> liveWorkers.contains(e.getServiceEntityName()));
        return registeredWorkers.stream().map(e -> new WorkerInfo().setAddress(e.getWorkerNetAddress())).collect(Collectors.toList());
    }

    @Override
    @VisibleForTesting
    public String showAllMembers() {
        try {
            List<WorkerServiceEntity> registeredWorkers = this.retrieveFullMembers();
            List liveWorkers = this.retrieveLiveMembers().stream().map(w -> w.getServiceEntityName()).collect(Collectors.toList());
            String printFormat = "%s\t%s\t%s%n";
            StringBuilder sb = new StringBuilder(String.format(printFormat, "WorkerId", "Address", "Status"));
            for (WorkerServiceEntity entity : registeredWorkers) {
                String entryLine = String.format(printFormat, entity.getServiceEntityName(), entity.getWorkerNetAddress().getHost() + ":" + entity.getWorkerNetAddress().getRpcPort(), liveWorkers.contains(entity.getServiceEntityName()) ? "ONLINE" : "OFFLINE");
                sb.append(entryLine);
            }
            return sb.toString();
        }
        catch (IOException ex) {
            return String.format("Exception happened:%s", ex.getMessage());
        }
    }

    @Override
    public void stopHeartBeat(WorkerInfo worker) throws IOException {
        WorkerServiceEntity entity = new WorkerServiceEntity(worker.getAddress());
        this.mAlluxioEtcdClient.mServiceDiscovery.unregisterService(entity.getServiceEntityName());
    }

    @Override
    public void decommission(WorkerInfo worker) throws IOException {
    }

    @Override
    public void close() throws Exception {
    }
}

