/*
 * Decompiled with CFR 0.152.
 */
package net.kuujo.copycat.cluster.internal.coordinator;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import net.kuujo.copycat.cluster.MessageHandler;
import net.kuujo.copycat.cluster.internal.MemberInfo;
import net.kuujo.copycat.cluster.internal.coordinator.AbstractMemberCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.LocalMemberCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.MemberCoordinator;
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.protocol.ProtocolException;
import net.kuujo.copycat.protocol.ProtocolServer;
import net.kuujo.copycat.util.concurrent.Futures;

public class DefaultLocalMemberCoordinator
extends AbstractMemberCoordinator
implements LocalMemberCoordinator {
    private final ProtocolServer server;
    private final Executor executor;
    private final Map<Integer, Map<Integer, Map<Integer, MessageHandler<ByteBuffer, ByteBuffer>>>> handlers = new ConcurrentHashMap<Integer, Map<Integer, Map<Integer, MessageHandler<ByteBuffer, ByteBuffer>>>>(1024);

    public DefaultLocalMemberCoordinator(MemberInfo info, Protocol protocol, Executor executor) {
        super(info);
        try {
            URI realUri = new URI(info.uri());
            if (!protocol.isValidUri(realUri)) {
                throw new ProtocolException(String.format("Invalid protocol URI %s", info.uri()), new Object[0]);
            }
            this.server = protocol.createServer(realUri);
        }
        catch (URISyntaxException e) {
            throw new ProtocolException(e);
        }
        this.executor = executor;
    }

    @Override
    public CompletableFuture<ByteBuffer> send(String topic, int address, int id, ByteBuffer message) {
        MessageHandler<ByteBuffer, ByteBuffer> handler;
        Map<Integer, MessageHandler<ByteBuffer, ByteBuffer>> addressHandlers;
        Map<Integer, Map<Integer, MessageHandler<ByteBuffer, ByteBuffer>>> topicHandlers = this.handlers.get(topic.hashCode());
        if (topicHandlers != null && (addressHandlers = topicHandlers.get(address)) != null && (handler = addressHandlers.get(id)) != null) {
            CompletableFuture<ByteBuffer> future = new CompletableFuture<ByteBuffer>();
            this.executor.execute(() -> ((CompletableFuture)handler.apply(message)).whenComplete((result, error) -> this.executor.execute(() -> {
                if (error == null) {
                    future.complete((ByteBuffer)result);
                } else {
                    future.completeExceptionally((Throwable)error);
                }
            })));
            return future;
        }
        return Futures.exceptionalFuture(new IllegalStateException("No handlers"));
    }

    @Override
    public LocalMemberCoordinator register(String topic, int address, int id, MessageHandler<ByteBuffer, ByteBuffer> handler) {
        Map topicHandlers = this.handlers.computeIfAbsent(topic.hashCode(), t -> new ConcurrentHashMap());
        Map addressHandlers = topicHandlers.computeIfAbsent(address, a -> new ConcurrentHashMap());
        addressHandlers.put(id, handler);
        return this;
    }

    @Override
    public LocalMemberCoordinator unregister(String topic, int address, int id) {
        Map<Integer, MessageHandler<ByteBuffer, ByteBuffer>> addressHandlers;
        Map<Integer, Map<Integer, MessageHandler<ByteBuffer, ByteBuffer>>> topicHandlers = this.handlers.get(topic.hashCode());
        if (topicHandlers != null && (addressHandlers = topicHandlers.get(address)) != null) {
            addressHandlers.remove(id);
            if (addressHandlers.isEmpty()) {
                topicHandlers.remove(address);
                if (topicHandlers.isEmpty()) {
                    this.handlers.remove(topic.hashCode());
                }
            }
        }
        return this;
    }

    private CompletableFuture<ByteBuffer> handle(ByteBuffer request) {
        MessageHandler<ByteBuffer, ByteBuffer> handler;
        Map<Integer, MessageHandler<ByteBuffer, ByteBuffer>> addressHandlers;
        Map<Integer, Map<Integer, MessageHandler<ByteBuffer, ByteBuffer>>> topicHandlers = this.handlers.get(request.getInt());
        if (topicHandlers != null && (addressHandlers = topicHandlers.get(request.getInt())) != null && (handler = addressHandlers.get(request.getInt())) != null) {
            return CompletableFuture.runAsync(() -> {}, this.executor).thenCompose(v -> (CompletableFuture)handler.apply(request.slice()));
        }
        return Futures.exceptionalFuture(new IllegalStateException("No handlers"));
    }

    @Override
    public synchronized CompletableFuture<MemberCoordinator> open() {
        return ((CompletableFuture)((CompletableFuture)super.open().thenComposeAsync(v -> this.server.listen(), this.executor)).thenRun(() -> this.server.handler(this::handle))).thenApply(v -> this);
    }

    @Override
    public synchronized CompletableFuture<Void> close() {
        return ((CompletableFuture)super.close().thenComposeAsync(v -> this.server.close(), this.executor)).thenRun(() -> this.server.handler(null));
    }

    public String toString() {
        return String.format("%s[uri=%s]", this.getClass().getCanonicalName(), this.uri());
    }
}

