/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.service.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.RequestHandler;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.util.Tools;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.service.impl.ClusterMessagingProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterMessagingProtocolServer
implements ProtocolServer {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ClusterCommunicationService clusterCommunicator;
    private volatile RequestHandler handler;
    private ExecutorService pool;

    public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
        this.clusterCommunicator = clusterCommunicator;
    }

    public void requestHandler(RequestHandler handler) {
        this.handler = handler;
    }

    public CompletableFuture<Void> listen() {
        if (this.pool == null || this.pool.isShutdown()) {
            this.pool = Executors.newCachedThreadPool(Tools.namedThreads((String)"copycat-netty-messaging-server-%d"));
        }
        this.clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_PING, (ClusterMessageHandler)new PingHandler());
        this.clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC, (ClusterMessageHandler)new SyncHandler());
        this.clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_POLL, (ClusterMessageHandler)new PollHandler());
        this.clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT, (ClusterMessageHandler)new SubmitHandler());
        return CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<Void> close() {
        this.clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_PING);
        this.clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC);
        this.clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_POLL);
        this.clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT);
        if (this.pool != null) {
            this.pool.shutdownNow();
            this.pool = null;
        }
        return CompletableFuture.completedFuture(null);
    }

    private abstract class CopycatMessageHandler<T>
    implements ClusterMessageHandler {
        private CopycatMessageHandler() {
        }

        public abstract void raftHandle(T var1, ClusterMessage var2);

        public void handle(ClusterMessage message) {
            Object request = ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
            this.raftHandle(request, message);
        }

        RequestHandler currentHandler() {
            RequestHandler currentHandler = ClusterMessagingProtocolServer.this.handler;
            if (currentHandler == null) {
                long sleepMs = 1L;
                for (int i = 0; i < 10 && (currentHandler = ClusterMessagingProtocolServer.this.handler) == null; ++i) {
                    try {
                        Thread.sleep(sleepMs <<= 1);
                        continue;
                    }
                    catch (InterruptedException e) {
                        ClusterMessagingProtocolServer.this.log.error("Interrupted", (Throwable)e);
                        return ClusterMessagingProtocolServer.this.handler;
                    }
                }
                if (currentHandler == null) {
                    ClusterMessagingProtocolServer.this.log.error("There was no handler registered!");
                    return ClusterMessagingProtocolServer.this.handler;
                }
            }
            return currentHandler;
        }

        final class PostExecutionTask<R>
        implements BiConsumer<R, Throwable> {
            private final ClusterMessage message;

            public PostExecutionTask(ClusterMessage message) {
                this.message = message;
            }

            @Override
            public void accept(R response, Throwable error) {
                if (error != null) {
                    ClusterMessagingProtocolServer.this.log.error("Processing {} failed.", (Object)this.message.subject(), (Object)error);
                } else {
                    try {
                        ClusterMessagingProtocolServer.this.log.trace("responding to {}", (Object)this.message.subject());
                        this.message.respond(ClusterMessagingProtocol.DB_SERIALIZER.encode(response));
                    }
                    catch (Exception e) {
                        ClusterMessagingProtocolServer.this.log.error("Failed responding with {}", (Object)response.getClass().getName(), (Object)e);
                    }
                }
            }
        }
    }

    private final class SubmitHandler
    extends CopycatMessageHandler<SubmitRequest> {
        private SubmitHandler() {
        }

        @Override
        public void raftHandle(final SubmitRequest request, final ClusterMessage message) {
            ClusterMessagingProtocolServer.this.pool.submit(new Runnable(){

                @Override
                public void run() {
                    SubmitHandler.this.currentHandler().submit(request).whenComplete((BiConsumer)new CopycatMessageHandler.PostExecutionTask(message));
                }
            });
        }
    }

    private final class PollHandler
    extends CopycatMessageHandler<PollRequest> {
        private PollHandler() {
        }

        @Override
        public void raftHandle(final PollRequest request, final ClusterMessage message) {
            ClusterMessagingProtocolServer.this.pool.submit(new Runnable(){

                @Override
                public void run() {
                    PollHandler.this.currentHandler().poll(request).whenComplete((BiConsumer)new CopycatMessageHandler.PostExecutionTask(message));
                }
            });
        }
    }

    private final class SyncHandler
    extends CopycatMessageHandler<SyncRequest> {
        private SyncHandler() {
        }

        @Override
        public void raftHandle(final SyncRequest request, final ClusterMessage message) {
            ClusterMessagingProtocolServer.this.pool.submit(new Runnable(){

                @Override
                public void run() {
                    SyncHandler.this.currentHandler().sync(request).whenComplete((BiConsumer)new CopycatMessageHandler.PostExecutionTask(message));
                }
            });
        }
    }

    private final class PingHandler
    extends CopycatMessageHandler<PingRequest> {
        private PingHandler() {
        }

        @Override
        public void raftHandle(final PingRequest request, final ClusterMessage message) {
            ClusterMessagingProtocolServer.this.pool.submit(new Runnable(){

                @Override
                public void run() {
                    PingHandler.this.currentHandler().ping(request).whenComplete((BiConsumer)new CopycatMessageHandler.PostExecutionTask(message));
                }
            });
        }
    }
}

