package com.baidu.hugegraph.computer.core.bsp;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.concurrent.BarrierEvent;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import com.google.common.annotations.VisibleForTesting;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/bsp/EtcdClient.class */
public class EtcdClient {
    private static final Logger LOG;
    private static final Charset ENCODING;
    private final Client client;
    private final Watch watch;
    private final KV kv;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/baidu/hugegraph/computer/core/bsp/EtcdClient$WaitEvent.class */
    public static class WaitEvent<V> {
        private final BarrierEvent barrierEvent = new BarrierEvent();
        private V result = null;
        static final /* synthetic */ boolean $assertionsDisabled;

        public void signalAll(V v) {
            this.result = v;
            this.barrierEvent.signalAll();
        }

        public V await(long j, long j2, Runnable runnable) throws InterruptedException {
            long currentTimeMillis = j + System.currentTimeMillis();
            for (long j3 = j; j3 > 0; j3 = currentTimeMillis - System.currentTimeMillis()) {
                j2 = Math.min(j3, j2);
                if (this.barrierEvent.await(j2)) {
                    if ($assertionsDisabled || this.result != null) {
                        return this.result;
                    }
                    throw new AssertionError();
                }
                runnable.run();
            }
            throw new ComputerException("Timeout(%sms) to wait event", Long.valueOf(j));
        }

        static {
            $assertionsDisabled = !EtcdClient.class.desiredAssertionStatus();
        }
    }

    public EtcdClient(String str, String str2) {
        E.checkArgumentNotNull(str, "The endpoints can't be null", new Object[0]);
        E.checkArgumentNotNull(str2, "The namespace can't be null", new Object[0]);
        this.client = Client.builder().endpoints(str).namespace(ByteSequence.from(str2.getBytes(ENCODING))).build();
        this.watch = this.client.getWatchClient();
        this.kv = this.client.getKVClient();
    }

    public void put(String str, byte[] bArr) {
        E.checkArgument(str != null, "The key can't be null.", new Object[0]);
        E.checkArgument(bArr != null, "The value can't be null.", new Object[0]);
        try {
            this.kv.put(ByteSequence.from(str, ENCODING), ByteSequence.from(bArr)).get();
        } catch (InterruptedException e) {
            throw new ComputerException("Interrupted while putting with key='%s'", e, str);
        } catch (ExecutionException e2) {
            throw new ComputerException("Error while putting with key='%s'", e2, str);
        }
    }

    public byte[] get(String str) {
        return get(str, false);
    }

    public byte[] get(String str, boolean z) {
        E.checkArgumentNotNull(str, "The key can't be null", new Object[0]);
        try {
            GetResponse getResponse = this.kv.get(ByteSequence.from(str, ENCODING)).get();
            if (getResponse.getCount() <= 0) {
                if (z) {
                    throw new ComputerException("Can't find value for key='%s'", str);
                }
                return null;
            }
            List<KeyValue> kvs = getResponse.getKvs();
            if ($assertionsDisabled || kvs.size() == 1) {
                return kvs.get(0).getValue().getBytes();
            }
            throw new AssertionError();
        } catch (InterruptedException e) {
            throw new ComputerException("Interrupted while getting with key='%s'", e, str);
        } catch (ExecutionException e2) {
            throw new ComputerException("Error while getting with key='%s'", e2, str);
        }
    }

    public byte[] get(String str, long j, long j2) {
        E.checkArgumentNotNull(str, "The key can't be null", new Object[0]);
        E.checkArgument(j > 0, "The timeout must be > 0, but got: %s", Long.valueOf(j));
        E.checkArgument(j2 > 0, "The logInterval must be > 0, but got: %s", Long.valueOf(j2));
        ByteSequence from = ByteSequence.from(str, ENCODING);
        try {
            GetResponse getResponse = this.kv.get(from).get();
            return getResponse.getCount() > 0 ? getResponse.getKvs().get(0).getValue().getBytes() : waitAndGetFromPutEvent(from, getResponse.getHeader().getRevision(), j, j2);
        } catch (InterruptedException e) {
            throw new ComputerException("Interrupted while getting with key='%s'", e, str);
        } catch (ExecutionException e2) {
            throw new ComputerException("Error while getting with key='%s'", e2, str);
        }
    }

    private byte[] waitAndGetFromPutEvent(ByteSequence byteSequence, long j, long j2, long j3) throws InterruptedException {
        WaitEvent waitEvent = new WaitEvent();
        Watch.Watcher watch = this.watch.watch(byteSequence, WatchOption.newBuilder().withRevision(j).withNoDelete(true).build(), watchResponse -> {
            Iterator<WatchEvent> it = watchResponse.getEvents().iterator();
            if (it.hasNext()) {
                WatchEvent next = it.next();
                if (!WatchEvent.EventType.PUT.equals(next.getEventType())) {
                    if (!$assertionsDisabled) {
                        throw new AssertionError();
                    }
                    throw new ComputerException("Unexpected event type '%s'", next.getEventType());
                }
                KeyValue keyValue = next.getKeyValue();
                if (byteSequence.equals(keyValue.getKey())) {
                    waitEvent.signalAll(next.getKeyValue().getValue().getBytes());
                } else {
                    if (!$assertionsDisabled) {
                        throw new AssertionError();
                    }
                    throw new ComputerException("Expect event key '%s', found '%s'", byteSequence.toString(ENCODING), keyValue.getKey().toString(ENCODING));
                }
            }
        });
        Throwable th = null;
        try {
            try {
                byte[] bArr = (byte[]) waitEvent.await(j2, j3, () -> {
                    LOG.info("Wait for key '{}' with timeout {}ms", byteSequence.toString(ENCODING), Long.valueOf(j2));
                });
                if (watch != null) {
                    if (0 != 0) {
                        try {
                            watch.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        watch.close();
                    }
                }
                return bArr;
            } finally {
            }
        } catch (Throwable th3) {
            if (watch != null) {
                if (th != null) {
                    try {
                        watch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    watch.close();
                }
            }
            throw th3;
        }
    }

    public List<byte[]> getWithPrefix(String str) {
        E.checkArgumentNotNull(str, "The prefix can't be null", new Object[0]);
        try {
            ByteSequence from = ByteSequence.from(str, ENCODING);
            GetResponse getResponse = this.kv.get(from, GetOption.newBuilder().withPrefix(from).withSortOrder(GetOption.SortOrder.ASCEND).build()).get();
            return getResponse.getCount() > 0 ? getResponseValues(getResponse) : Collections.emptyList();
        } catch (InterruptedException e) {
            throw new ComputerException("Interrupted while getting with prefix='%s'", e, str);
        } catch (ExecutionException e2) {
            throw new ComputerException("Error while getting with prefix='%s'", e2, str);
        }
    }

    public List<byte[]> getWithPrefix(String str, int i) {
        E.checkArgumentNotNull(str, "The prefix can't be null", new Object[0]);
        E.checkArgument(i >= 0, "The count must be >= 0, but got: %s", Integer.valueOf(i));
        try {
            ByteSequence from = ByteSequence.from(str, ENCODING);
            GetResponse getResponse = this.kv.get(from, GetOption.newBuilder().withPrefix(from).withLimit(i).withSortOrder(GetOption.SortOrder.ASCEND).build()).get();
            if (getResponse.getCount() == i) {
                return getResponseValues(getResponse);
            }
            throw new ComputerException("Expect %s elements, only find %s elements with prefix='%s'", Integer.valueOf(i), Long.valueOf(getResponse.getCount()), str);
        } catch (InterruptedException e) {
            throw new ComputerException("Interrupted while getting with prefix='%s', count=%s", e, str, Integer.valueOf(i));
        } catch (ExecutionException e2) {
            throw new ComputerException("Error while getting with prefix='%s', count=%s", e2, str, Integer.valueOf(i));
        }
    }

    public List<byte[]> getWithPrefix(String str, int i, long j, long j2) {
        E.checkArgumentNotNull(str, "The prefix can't be null", new Object[0]);
        E.checkArgument(i >= 0, "The count must be >= 0, but got: %s", Integer.valueOf(i));
        E.checkArgument(j2 >= 0, "The logInterval must be >= 0, but got: %s", Long.valueOf(j2));
        ByteSequence from = ByteSequence.from(str, ENCODING);
        try {
            GetResponse getResponse = this.kv.get(from, GetOption.newBuilder().withPrefix(from).withSortOrder(GetOption.SortOrder.ASCEND).withLimit(i).build()).get();
            return getResponse.getCount() == ((long) i) ? getResponseValues(getResponse) : waitAndPrefixGetFromPutEvent(from, i, getResponse.getKvs(), getResponse.getHeader().getRevision(), j, j2);
        } catch (InterruptedException e) {
            throw new ComputerException("Interrupted while getting with prefix='%s', count=%s, timeout=%s", e, str, Integer.valueOf(i), Long.valueOf(j));
        } catch (ExecutionException e2) {
            throw new ComputerException("Error while getting with prefix='%s', count=%s, timeout=%s", e2, str, Integer.valueOf(i), Long.valueOf(j));
        }
    }

    private List<byte[]> waitAndPrefixGetFromPutEvent(ByteSequence byteSequence, int i, List<KeyValue> list, long j, long j2, long j3) throws InterruptedException {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (KeyValue keyValue : list) {
            concurrentHashMap.put(keyValue.getKey(), keyValue.getValue());
        }
        WaitEvent waitEvent = new WaitEvent();
        Consumer<WatchResponse> consumer = watchResponse -> {
            for (WatchEvent watchEvent : watchResponse.getEvents()) {
                if (WatchEvent.EventType.PUT.equals(watchEvent.getEventType())) {
                    KeyValue keyValue2 = watchEvent.getKeyValue();
                    concurrentHashMap.put(keyValue2.getKey(), keyValue2.getValue());
                    if (concurrentHashMap.size() == i) {
                        ArrayList arrayList = new ArrayList(i);
                        Iterator it = concurrentHashMap.values().iterator();
                        while (it.hasNext()) {
                            arrayList.add(((ByteSequence) it.next()).getBytes());
                        }
                        waitEvent.signalAll(arrayList);
                    }
                } else {
                    if (!WatchEvent.EventType.DELETE.equals(watchEvent.getEventType())) {
                        throw new ComputerException("Unexpected event type '%s'", watchEvent.getEventType());
                    }
                    concurrentHashMap.remove(watchEvent.getKeyValue().getKey());
                }
            }
        };
        Watch.Watcher watch = this.watch.watch(byteSequence, WatchOption.newBuilder().withPrefix(byteSequence).withRevision(j).build(), consumer);
        Throwable th = null;
        try {
            List<byte[]> list2 = (List) waitEvent.await(j2, j3, () -> {
                LOG.info("Wait for keys with prefix '{}' and timeout {}ms, expect {} keys but actual got {} keys", byteSequence.toString(ENCODING), Long.valueOf(j2), Integer.valueOf(i), Integer.valueOf(concurrentHashMap.size()));
            });
            if (watch != null) {
                if (0 != 0) {
                    try {
                        watch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    watch.close();
                }
            }
            return list2;
        } catch (Throwable th3) {
            if (watch != null) {
                if (0 != 0) {
                    try {
                        watch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    watch.close();
                }
            }
            throw th3;
        }
    }

    public long delete(String str) {
        E.checkArgumentNotNull(str, "The key can't be null", new Object[0]);
        try {
            return this.client.getKVClient().delete(ByteSequence.from(str, ENCODING)).get().getDeleted();
        } catch (InterruptedException e) {
            throw new ComputerException("Interrupted while deleting '%s'", e, str);
        } catch (ExecutionException e2) {
            throw new ComputerException("Error while deleting '%s'", e2, str);
        }
    }

    public long deleteWithPrefix(String str) {
        E.checkArgumentNotNull(str, "The prefix can't be null", new Object[0]);
        ByteSequence from = ByteSequence.from(str, ENCODING);
        try {
            return this.client.getKVClient().delete(from, DeleteOption.newBuilder().withPrefix(from).build()).get().getDeleted();
        } catch (InterruptedException e) {
            throw new ComputerException("Interrupted while deleting with prefix '%s'", e, str);
        } catch (ExecutionException e2) {
            throw new ComputerException("ExecutionException is thrown while deleting with prefix '%s'", e2, str);
        }
    }

    public long deleteAllKvsInNamespace() {
        return deleteWithPrefix("");
    }

    public void close() {
        this.client.close();
    }

    @VisibleForTesting
    protected KV getKv() {
        return this.kv;
    }

    private static List<byte[]> getResponseValues(GetResponse getResponse) {
        ArrayList arrayList = new ArrayList((int) getResponse.getCount());
        Iterator<KeyValue> it = getResponse.getKvs().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getValue().getBytes());
        }
        return arrayList;
    }

    static {
        $assertionsDisabled = !EtcdClient.class.desiredAssertionStatus();
        LOG = Log.logger((Class<?>) EtcdClient.class);
        ENCODING = StandardCharsets.UTF_8;
    }
}
