/*
 * Decompiled with CFR 0.152.
 */
package net.kuujo.copycat.cluster.internal.coordinator;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import net.kuujo.copycat.cluster.Cluster;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.MembershipEvent;
import net.kuujo.copycat.cluster.internal.CoordinatedCluster;
import net.kuujo.copycat.cluster.internal.CoordinatedMember;
import net.kuujo.copycat.cluster.internal.MemberInfo;
import net.kuujo.copycat.cluster.internal.Router;
import net.kuujo.copycat.cluster.internal.coordinator.AbstractMemberCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorCluster;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
import net.kuujo.copycat.cluster.internal.coordinator.DefaultLocalMemberCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.DefaultRemoteMemberCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.LocalMemberCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.MemberCoordinator;
import net.kuujo.copycat.cluster.internal.manager.ClusterManager;
import net.kuujo.copycat.cluster.internal.manager.MemberManager;
import net.kuujo.copycat.log.BufferedLog;
import net.kuujo.copycat.raft.RaftConfig;
import net.kuujo.copycat.raft.RaftContext;
import net.kuujo.copycat.raft.protocol.RaftProtocol;
import net.kuujo.copycat.raft.protocol.Request;
import net.kuujo.copycat.raft.protocol.Response;
import net.kuujo.copycat.resource.Resource;
import net.kuujo.copycat.resource.internal.ResourceManager;
import net.kuujo.copycat.util.ConfigurationException;
import net.kuujo.copycat.util.concurrent.Futures;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
import net.kuujo.copycat.util.internal.Assert;
import net.kuujo.copycat.util.serializer.KryoSerializer;
import net.kuujo.copycat.util.serializer.Serializer;

public class DefaultClusterCoordinator
implements ClusterCoordinator {
    private final ThreadFactory threadFactory = new NamedThreadFactory("copycat-coordinator-%d");
    private final ScheduledExecutorService executor;
    private final CoordinatorConfig config;
    private final DefaultLocalMemberCoordinator localMember;
    final Map<String, AbstractMemberCoordinator> members = new ConcurrentHashMap<String, AbstractMemberCoordinator>(128);
    private final RaftContext context;
    private final ClusterManager cluster;
    private final Map<String, ResourceHolder> resources = new ConcurrentHashMap<String, ResourceHolder>(1024);
    private volatile boolean open;

    public DefaultClusterCoordinator(CoordinatorConfig config) {
        this.config = config.copy();
        this.executor = Executors.newSingleThreadScheduledExecutor(this.threadFactory);
        this.localMember = new DefaultLocalMemberCoordinator(new MemberInfo(config.getClusterConfig().getLocalMember(), config.getClusterConfig().getMembers().contains(config.getClusterConfig().getLocalMember()) ? Member.Type.ACTIVE : Member.Type.PASSIVE, Member.Status.ALIVE), config.getClusterConfig().getProtocol(), Executors.newSingleThreadExecutor(this.threadFactory));
        this.members.put(config.getClusterConfig().getLocalMember(), this.localMember);
        for (String member : config.getClusterConfig().getMembers()) {
            if (this.members.containsKey(member)) continue;
            this.members.put(member, new DefaultRemoteMemberCoordinator(new MemberInfo(member, Member.Type.ACTIVE, Member.Status.ALIVE), config.getClusterConfig().getProtocol(), Executors.newSingleThreadScheduledExecutor(this.threadFactory)));
        }
        CoordinatedResourceConfig resourceConfig = new CoordinatedResourceConfig().withElectionTimeout(config.getClusterConfig().getElectionTimeout()).withHeartbeatInterval(config.getClusterConfig().getHeartbeatInterval()).withReplicas(config.getClusterConfig().getMembers()).withLog(new BufferedLog());
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("copycat-coordinator"));
        this.context = new RaftContext(config.getName(), config.getClusterConfig().getLocalMember(), new RaftConfig(resourceConfig.toMap()), executor);
        this.cluster = new CoordinatorCluster(0, this, this.context, new ResourceRouter(executor), new KryoSerializer(), executor, config.getExecutor());
    }

    @Override
    public Cluster cluster() {
        return this.cluster;
    }

    private void handleMembershipEvent(MembershipEvent event) {
        if (event.type() == MembershipEvent.Type.JOIN && !this.members.containsKey(event.member().uri())) {
            MemberCoordinator coordinator = ((CoordinatedMember)event.member()).coordinator();
            this.members.put(coordinator.uri(), (AbstractMemberCoordinator)coordinator);
        } else if (event.type() == MembershipEvent.Type.LEAVE) {
            this.members.remove(event.member().uri());
        }
    }

    @Override
    public CoordinatorConfig config() {
        return this.config;
    }

    @Override
    public LocalMemberCoordinator member() {
        return this.localMember;
    }

    @Override
    public MemberCoordinator member(String uri) {
        return this.members.get(uri);
    }

    @Override
    public Collection<MemberCoordinator> members() {
        return Collections.unmodifiableCollection(this.members.values());
    }

    @Override
    public <T extends Resource<T>> T getResource(String name) {
        return this.getResource(name, new CoordinatedResourceConfig());
    }

    @Override
    public <T extends Resource<T>> T getResource(String name, CoordinatedResourceConfig config) {
        ResourceHolder resource = this.resources.computeIfAbsent(name, n -> {
            ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("copycat-" + name + "-%d"));
            RaftContext state = new RaftContext(name, this.member().uri(), new RaftConfig(config.toMap()), executor);
            CoordinatedCluster cluster = new CoordinatedCluster(name.hashCode(), this, state, new ResourceRouter(executor), config.getSerializer(), executor, config.getExecutor());
            ResourceManager context = new ResourceManager(name, config, cluster, state, this);
            try {
                return new ResourceHolder(config.getResourceType().getConstructor(ResourceManager.class).newInstance(context), cluster, state);
            }
            catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                throw new ConfigurationException("Failed to instantiate resource", new Object[]{e});
            }
        });
        return (T)resource.resource;
    }

    public CompletableFuture<Void> acquireResource(String name) {
        Assert.state(this.isOpen(), "coordinator not open", new Object[0]);
        ResourceHolder resource = this.resources.get(name);
        if (resource != null) {
            if (resource.cluster.isClosed()) {
                return resource.cluster.open().thenCompose(v -> resource.state.open());
            }
            return CompletableFuture.completedFuture(null);
        }
        return Futures.exceptionalFuture(new IllegalStateException("Invalid resource " + name));
    }

    public CompletableFuture<Void> releaseResource(String name) {
        ResourceHolder resource = this.resources.get(name);
        if (resource != null) {
            if (resource.cluster.isOpen()) {
                return resource.state.close().thenCompose(v -> resource.cluster.close());
            }
            return CompletableFuture.completedFuture(null);
        }
        return Futures.exceptionalFuture(new IllegalStateException("Invalid resource " + name));
    }

    private synchronized CompletableFuture<Void> closeResources() {
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>(this.resources.size());
        for (ResourceHolder resource : this.resources.values()) {
            if (!resource.cluster.isOpen()) continue;
            futures.add(((CompletableFuture)resource.state.close().thenCompose(v -> resource.cluster.close())).thenRun(() -> resource.state.executor().shutdown()));
        }
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    }

    @Override
    public synchronized CompletableFuture<ClusterCoordinator> open() {
        if (this.open) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture[] futures = new CompletableFuture[this.members.size()];
        int i = 0;
        for (MemberCoordinator memberCoordinator : this.members.values()) {
            futures[i++] = memberCoordinator.open();
        }
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)CompletableFuture.allOf(futures).thenRun(() -> this.cluster.addMembershipListener(this::handleMembershipEvent))).thenComposeAsync(v -> this.cluster.open(), (Executor)this.executor)).thenComposeAsync(v -> this.context.open(), (Executor)this.executor)).thenRun(() -> {
            this.open = true;
        })).thenApply(v -> this);
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public synchronized CompletableFuture<Void> close() {
        if (!this.open) {
            return CompletableFuture.completedFuture(null);
        }
        this.open = false;
        CompletableFuture[] futures = new CompletableFuture[this.members.size()];
        int i = 0;
        for (MemberCoordinator memberCoordinator : this.members.values()) {
            futures[i++] = memberCoordinator.close();
        }
        this.cluster.removeMembershipListener(this::handleMembershipEvent);
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.closeResources().thenComposeAsync(v -> this.context.close(), (Executor)this.executor)).thenComposeAsync(v -> this.cluster.close(), (Executor)this.executor)).thenComposeAsync(v -> CompletableFuture.allOf(futures))).thenRun(this.executor::shutdown);
    }

    @Override
    public boolean isClosed() {
        return !this.open;
    }

    public String toString() {
        return String.format("%s[members=%s]", this.getClass().getCanonicalName(), this.members.values());
    }

    private static class ResourceHolder {
        private final Resource resource;
        private final ClusterManager cluster;
        private final RaftContext state;

        private ResourceHolder(Resource resource, ClusterManager cluster, RaftContext state) {
            this.resource = resource;
            this.cluster = cluster;
            this.state = state;
        }
    }

    private static class ResourceRouter
    implements Router {
        private static final int PROTOCOL_ID = 1;
        private final Serializer serializer = new KryoSerializer();
        private final Executor executor;

        private ResourceRouter(Executor executor) {
            this.executor = executor;
        }

        @Override
        public void createRoutes(ClusterManager cluster, RaftProtocol protocol) {
            cluster.member().registerHandler("sync", 1, protocol::sync, this.serializer, this.executor);
            cluster.member().registerHandler("poll", 1, protocol::poll, this.serializer, this.executor);
            cluster.member().registerHandler("vote", 1, protocol::vote, this.serializer, this.executor);
            cluster.member().registerHandler("append", 1, protocol::append, this.serializer, this.executor);
            cluster.member().registerHandler("query", 1, protocol::query, this.serializer, this.executor);
            cluster.member().registerHandler("commit", 1, protocol::commit, this.serializer, this.executor);
            protocol.syncHandler(request -> this.handleOutboundRequest("sync", request, cluster));
            protocol.pollHandler(request -> this.handleOutboundRequest("poll", request, cluster));
            protocol.voteHandler(request -> this.handleOutboundRequest("vote", request, cluster));
            protocol.appendHandler(request -> this.handleOutboundRequest("append", request, cluster));
            protocol.queryHandler(request -> this.handleOutboundRequest("query", request, cluster));
            protocol.commitHandler(request -> this.handleOutboundRequest("commit", request, cluster));
        }

        private <T extends Request, U extends Response> CompletableFuture<U> handleOutboundRequest(String topic, T request, ClusterManager cluster) {
            MemberManager member = cluster.member(request.uri());
            if (member != null) {
                return member.send(topic, 1, request, this.serializer, this.executor);
            }
            return Futures.exceptionalFuture(new IllegalStateException(String.format("Invalid member URI %s", request.uri())));
        }

        @Override
        public void destroyRoutes(ClusterManager cluster, RaftProtocol protocol) {
            cluster.member().unregisterHandler("sync", 1);
            cluster.member().unregisterHandler("poll", 1);
            cluster.member().unregisterHandler("vote", 1);
            cluster.member().unregisterHandler("append", 1);
            cluster.member().unregisterHandler("query", 1);
            cluster.member().unregisterHandler("commit", 1);
            protocol.syncHandler(null);
            protocol.pollHandler(null);
            protocol.voteHandler(null);
            protocol.appendHandler(null);
            protocol.queryHandler(null);
            protocol.commitHandler(null);
        }
    }
}

