/*
 * Decompiled with CFR 0.152.
 */
package net.kuujo.copycat.cluster.internal.coordinator;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.cluster.internal.MemberInfo;
import net.kuujo.copycat.cluster.internal.coordinator.AbstractMemberCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.MemberCoordinator;
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.protocol.ProtocolClient;
import net.kuujo.copycat.protocol.ProtocolException;

public class DefaultRemoteMemberCoordinator
extends AbstractMemberCoordinator {
    private final ProtocolClient client;
    private final ScheduledExecutorService executor;

    public DefaultRemoteMemberCoordinator(MemberInfo info, Protocol protocol, ScheduledExecutorService executor) {
        super(info);
        try {
            URI realUri = new URI(info.uri());
            if (!protocol.isValidUri(realUri)) {
                throw new ProtocolException(String.format("Invalid protocol URI %s", info.uri()), new Object[0]);
            }
            this.client = protocol.createClient(realUri);
        }
        catch (URISyntaxException e) {
            throw new ProtocolException(e);
        }
        this.executor = executor;
    }

    @Override
    public CompletableFuture<ByteBuffer> send(String topic, int address, int id, ByteBuffer message) {
        ByteBuffer request = ByteBuffer.allocateDirect(message.capacity() + 12);
        request.putInt(topic.hashCode());
        request.putInt(address);
        request.putInt(id);
        request.put(message);
        request.flip();
        CompletableFuture<ByteBuffer> future = new CompletableFuture<ByteBuffer>();
        this.client.write(request).whenCompleteAsync((result, error) -> {
            if (error == null) {
                future.complete((ByteBuffer)result);
            } else {
                future.completeExceptionally((Throwable)error);
            }
        }, (Executor)this.executor);
        return future;
    }

    @Override
    public synchronized CompletableFuture<MemberCoordinator> open() {
        return ((CompletableFuture)super.open().thenComposeAsync(v -> this.connect(), (Executor)this.executor)).thenApply(v -> this);
    }

    private CompletableFuture<Void> connect() {
        return this.connect(new CompletableFuture<Void>());
    }

    private CompletableFuture<Void> connect(CompletableFuture<Void> future) {
        if (this.isOpen()) {
            this.client.connect().whenComplete((result, error) -> {
                if (error == null) {
                    future.complete(null);
                } else {
                    this.executor.schedule(() -> this.connect(future), 100L, TimeUnit.MILLISECONDS);
                }
            });
        } else {
            future.completeExceptionally(new IllegalStateException("Member closed"));
        }
        return future;
    }

    @Override
    public synchronized CompletableFuture<Void> close() {
        return super.close().thenComposeAsync(v -> this.client.close(), (Executor)this.executor);
    }

    public String toString() {
        return String.format("%s[uri=%s]", this.getClass().getCanonicalName(), this.uri());
    }
}

