/*
 * Decompiled with CFR 0.152.
 */
package org.vertx.java.core.eventbus.impl;

import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Handler;
import org.vertx.java.core.SimpleHandler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.eventbus.impl.BaseMessage;
import org.vertx.java.core.eventbus.impl.BooleanMessage;
import org.vertx.java.core.eventbus.impl.BufferMessage;
import org.vertx.java.core.eventbus.impl.ByteArrayMessage;
import org.vertx.java.core.eventbus.impl.ByteMessage;
import org.vertx.java.core.eventbus.impl.CharacterMessage;
import org.vertx.java.core.eventbus.impl.ClusterManager;
import org.vertx.java.core.eventbus.impl.DoubleMessage;
import org.vertx.java.core.eventbus.impl.FloatMessage;
import org.vertx.java.core.eventbus.impl.IntMessage;
import org.vertx.java.core.eventbus.impl.JsonArrayMessage;
import org.vertx.java.core.eventbus.impl.JsonObjectMessage;
import org.vertx.java.core.eventbus.impl.LongMessage;
import org.vertx.java.core.eventbus.impl.MessageFactory;
import org.vertx.java.core.eventbus.impl.PingMessage;
import org.vertx.java.core.eventbus.impl.ServerIDs;
import org.vertx.java.core.eventbus.impl.ShortMessage;
import org.vertx.java.core.eventbus.impl.StringMessage;
import org.vertx.java.core.eventbus.impl.SubsMap;
import org.vertx.java.core.eventbus.impl.hazelcast.HazelcastClusterManager;
import org.vertx.java.core.impl.Context;
import org.vertx.java.core.impl.VertxInternal;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
import org.vertx.java.core.net.NetClient;
import org.vertx.java.core.net.NetServer;
import org.vertx.java.core.net.NetSocket;
import org.vertx.java.core.net.impl.ServerID;
import org.vertx.java.core.parsetools.RecordParser;

public class DefaultEventBus
implements EventBus {
    private static final Logger log = LoggerFactory.getLogger(DefaultEventBus.class);
    private static final Buffer PONG = new Buffer(new byte[]{1});
    private static final long PING_INTERVAL = 20000L;
    private static final long PING_REPLY_INTERVAL = 20000L;
    public static final int DEFAULT_CLUSTER_PORT = 2550;
    private final VertxInternal vertx;
    private final ServerID serverID;
    private NetServer server;
    private SubsMap subs;
    private final ConcurrentMap<ServerID, ConnectionHolder> connections = new ConcurrentHashMap<ServerID, ConnectionHolder>();
    private final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<String, Handlers>();
    private final AtomicInteger seq = new AtomicInteger(0);
    private final String prefix = UUID.randomUUID().toString();
    private final ClusterManager clusterMgr;

    public DefaultEventBus(VertxInternal vertx) {
        this.vertx = vertx;
        this.serverID = new ServerID(2550, "localhost");
        this.server = null;
        this.subs = null;
        this.clusterMgr = null;
    }

    public DefaultEventBus(VertxInternal vertx, String hostname) {
        this(vertx, 2550, hostname);
    }

    public DefaultEventBus(VertxInternal vertx, int port, String hostname) {
        this.vertx = vertx;
        this.serverID = new ServerID(port, hostname);
        this.clusterMgr = this.createClusterManager(vertx);
        this.subs = this.clusterMgr.getSubsMap("subs");
        this.server = this.setServer();
    }

    protected ClusterManager createClusterManager(VertxInternal vertx) {
        return new HazelcastClusterManager(vertx);
    }

    @Override
    public void send(String address, JsonObject message, Handler<Message<JsonObject>> replyHandler) {
        this.sendOrPub(new JsonObjectMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, JsonObject message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, JsonArray message, Handler<Message<JsonArray>> replyHandler) {
        this.sendOrPub(new JsonArrayMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, JsonArray message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, Buffer message, Handler<Message<Buffer>> replyHandler) {
        this.sendOrPub(new BufferMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, Buffer message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, byte[] message, Handler<Message<byte[]>> replyHandler) {
        this.sendOrPub(new ByteArrayMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, byte[] message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, String message, Handler<Message<String>> replyHandler) {
        this.sendOrPub(new StringMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, String message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, Integer message, Handler<Message<Integer>> replyHandler) {
        this.sendOrPub(new IntMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, Integer message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, Long message, Handler<Message<Long>> replyHandler) {
        this.sendOrPub(new LongMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, Long message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, Float message, Handler<Message<Float>> replyHandler) {
        this.sendOrPub(new FloatMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, Float message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, Double message, Handler<Message<Double>> replyHandler) {
        this.sendOrPub(new DoubleMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, Double message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, Boolean message, Handler<Message<Boolean>> replyHandler) {
        this.sendOrPub(new BooleanMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, Boolean message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, Short message, Handler<Message<Short>> replyHandler) {
        this.sendOrPub(new ShortMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, Short message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, Character message, Handler<Message<Character>> replyHandler) {
        this.sendOrPub(new CharacterMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, Character message) {
        this.send(address, message, null);
    }

    @Override
    public void send(String address, Byte message, Handler<Message<Byte>> replyHandler) {
        this.sendOrPub(new ByteMessage(true, address, message), replyHandler);
    }

    @Override
    public void send(String address, Byte message) {
        this.send(address, message, null);
    }

    @Override
    public void publish(String address, JsonObject message) {
        this.sendOrPub(new JsonObjectMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, JsonArray message) {
        this.sendOrPub(new JsonArrayMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, Buffer message) {
        this.sendOrPub(new BufferMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, byte[] message) {
        this.sendOrPub(new ByteArrayMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, String message) {
        this.sendOrPub(new StringMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, Integer message) {
        this.sendOrPub(new IntMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, Long message) {
        this.sendOrPub(new LongMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, Float message) {
        this.sendOrPub(new FloatMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, Double message) {
        this.sendOrPub(new DoubleMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, Boolean message) {
        this.sendOrPub(new BooleanMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, Short message) {
        this.sendOrPub(new ShortMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, Character message) {
        this.sendOrPub(new CharacterMessage(false, address, message), null);
    }

    @Override
    public void publish(String address, Byte message) {
        this.sendOrPub(new ByteMessage(false, address, message), null);
    }

    @Override
    public void registerHandler(String address, Handler<? extends Message> handler, AsyncResultHandler<Void> completionHandler) {
        this.registerHandler(address, handler, completionHandler, false, false);
    }

    @Override
    public void registerHandler(String address, Handler<? extends Message> handler) {
        this.registerHandler(address, handler, null);
    }

    @Override
    public void registerLocalHandler(String address, Handler<? extends Message> handler) {
        this.registerHandler(address, handler, null, false, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregisterHandler(String address, Handler<? extends Message> handler, AsyncResultHandler<Void> completionHandler) {
        Context context = this.vertx.getOrAssignContext();
        Handlers handlers = (Handlers)this.handlerMap.get(address);
        if (handlers != null) {
            Handlers handlers2 = handlers;
            synchronized (handlers2) {
                int size = handlers.list.size();
                for (int i = 0; i < size; ++i) {
                    HandlerHolder holder = handlers.list.get(i);
                    if (holder.handler != handler) continue;
                    handlers.list.remove(i);
                    holder.removed = true;
                    if (handlers.list.isEmpty()) {
                        this.handlerMap.remove(address);
                        if (this.subs != null && !holder.localOnly) {
                            this.removeSub(address, this.serverID, completionHandler);
                        } else if (completionHandler != null) {
                            this.callCompletionHandler(completionHandler);
                        }
                    } else if (completionHandler != null) {
                        this.callCompletionHandler(completionHandler);
                    }
                    this.getHandlerCloseHook((Context)context).entries.remove(new HandlerEntry(address, handler));
                    return;
                }
            }
        }
    }

    @Override
    public void unregisterHandler(String address, Handler<? extends Message> handler) {
        this.unregisterHandler(address, handler, null);
    }

    @Override
    public void close(Handler<Void> doneHandler) {
        if (this.clusterMgr != null) {
            this.clusterMgr.close();
        }
        if (this.server != null) {
            this.server.close(doneHandler);
        }
    }

    void sendReply(ServerID dest, BaseMessage message, Handler replyHandler) {
        this.sendOrPub(dest, message, replyHandler);
    }

    private NetServer setServer() {
        return this.vertx.createNetServer().connectHandler(new Handler<NetSocket>(){

            @Override
            public void handle(final NetSocket socket) {
                final RecordParser parser = RecordParser.newFixed(4, null);
                Handler<Buffer> handler = new Handler<Buffer>(){
                    int size = -1;

                    @Override
                    public void handle(Buffer buff) {
                        if (this.size == -1) {
                            this.size = buff.getInt(0);
                            parser.fixedSizeMode(this.size);
                        } else {
                            BaseMessage received = MessageFactory.read(buff);
                            if (received.type() == 0) {
                                socket.write(PONG);
                            } else {
                                DefaultEventBus.this.receiveMessage(received);
                            }
                            parser.fixedSizeMode(4);
                            this.size = -1;
                        }
                    }
                };
                parser.setOutput(handler);
                socket.dataHandler(parser);
            }
        }).listen(this.serverID.port, this.serverID.host);
    }

    private void sendToSubs(ServerIDs subs, BaseMessage message) {
        if (message.send) {
            ServerID sid = subs.choose();
            if (!sid.equals(this.serverID)) {
                this.sendRemote(sid, message);
            } else {
                this.receiveMessage(message);
            }
        } else {
            for (ServerID sid : subs) {
                if (!sid.equals(this.serverID)) {
                    this.sendRemote(sid, message);
                    continue;
                }
                this.receiveMessage(message);
            }
        }
    }

    private void sendOrPub(BaseMessage message, Handler replyHandler) {
        this.sendOrPub(null, message, replyHandler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendOrPub(ServerID replyDest, final BaseMessage message, Handler replyHandler) {
        Context context = this.vertx.getOrAssignContext();
        try {
            message.sender = this.serverID;
            if (replyHandler != null) {
                message.replyAddress = this.prefix + String.valueOf(this.seq.incrementAndGet());
                this.registerHandler(message.replyAddress, replyHandler, null, true, true);
            }
            if (replyDest != null) {
                if (!replyDest.equals(this.serverID)) {
                    this.sendRemote(replyDest, message);
                } else {
                    this.receiveMessage(message);
                }
            } else if (this.subs != null) {
                this.subs.get(message.address, new AsyncResultHandler<ServerIDs>(){

                    @Override
                    public void handle(AsyncResult<ServerIDs> event) {
                        if (event.exception == null) {
                            ServerIDs serverIDs = (ServerIDs)event.result;
                            if (!serverIDs.isEmpty()) {
                                DefaultEventBus.this.sendToSubs(serverIDs, message);
                            } else {
                                DefaultEventBus.this.receiveMessage(message);
                            }
                        } else {
                            log.error("Failed to send message", event.exception);
                        }
                    }
                });
            } else {
                this.receiveMessage(message);
            }
        }
        finally {
            if (context != null) {
                this.vertx.setContext(context);
            }
        }
    }

    private void registerHandler(String address, Handler<? extends Message> handler, AsyncResultHandler<Void> completionHandler, boolean replyHandler, boolean localOnly) {
        if (address == null) {
            throw new NullPointerException("address");
        }
        Context context = this.vertx.getOrAssignContext();
        Handlers handlers = (Handlers)this.handlerMap.get(address);
        if (handlers == null) {
            handlers = new Handlers();
            Handlers prevHandlers = this.handlerMap.putIfAbsent(address, handlers);
            if (prevHandlers != null) {
                handlers = prevHandlers;
            }
            if (completionHandler == null) {
                completionHandler = new AsyncResultHandler<Void>(){

                    @Override
                    public void handle(AsyncResult<Void> event) {
                        if (event.exception != null) {
                            log.error("Failed to remove entry", event.exception);
                        }
                    }
                };
            }
            handlers.list.add(new HandlerHolder(handler, replyHandler, localOnly, context));
            if (this.subs != null && !replyHandler && !localOnly) {
                this.subs.put(address, this.serverID, completionHandler);
            } else {
                this.callCompletionHandler(completionHandler);
            }
        } else {
            handlers.list.add(new HandlerHolder(handler, replyHandler, localOnly, context));
            if (completionHandler != null) {
                this.callCompletionHandler(completionHandler);
            }
        }
        this.getHandlerCloseHook((Context)context).entries.add(new HandlerEntry(address, handler));
    }

    private HandlerCloseHook getHandlerCloseHook(Context context) {
        HandlerCloseHook hcl = (HandlerCloseHook)context.getCloseHook(this);
        if (hcl == null) {
            hcl = new HandlerCloseHook();
            context.putCloseHook(this, hcl);
        }
        return hcl;
    }

    private void callCompletionHandler(AsyncResultHandler<Void> completionHandler) {
        AsyncResult<Void> f = new AsyncResult<Void>((Void)null);
        completionHandler.handle(f);
    }

    private void cleanSubsForServerID(ServerID theServerID) {
        if (this.subs != null) {
            this.subs.removeAllForServerID(theServerID, new AsyncResultHandler<Void>(){

                @Override
                public void handle(AsyncResult<Void> event) {
                }
            });
        }
    }

    private void cleanupConnection(ServerID theServerID, ConnectionHolder holder, boolean failed) {
        if (holder.timeoutID != -1L) {
            this.vertx.cancelTimer(holder.timeoutID);
        }
        if (holder.pingTimeoutID != -1L) {
            this.vertx.cancelTimer(holder.pingTimeoutID);
        }
        try {
            holder.socket.close();
        }
        catch (Exception ignore) {
            // empty catch block
        }
        if (this.connections.remove(theServerID, holder)) {
            log.debug("Cluster connection closed: " + theServerID + " holder " + holder);
            if (failed) {
                this.cleanSubsForServerID(theServerID);
            }
        }
    }

    private void sendRemote(ServerID theServerID, BaseMessage message) {
        ConnectionHolder holder = (ConnectionHolder)this.connections.get(theServerID);
        if (holder == null) {
            NetClient client = this.vertx.createNetClient();
            client.setConnectTimeout(60000L);
            holder = new ConnectionHolder(client);
            ConnectionHolder prevHolder = this.connections.putIfAbsent(theServerID, holder);
            if (prevHolder != null) {
                holder = prevHolder;
            } else {
                holder.connect(client, theServerID);
            }
        }
        holder.writeMessage(message);
    }

    private void schedulePing(final ConnectionHolder holder) {
        holder.pingTimeoutID = this.vertx.setTimer(20000L, new Handler<Long>(){

            @Override
            public void handle(Long ignore) {
                holder.timeoutID = DefaultEventBus.this.vertx.setTimer(20000L, new Handler<Long>(){

                    @Override
                    public void handle(Long timerID) {
                        log.info("No pong from server " + DefaultEventBus.this.serverID + " - will consider it dead, timerID: " + timerID + " holder " + holder);
                        DefaultEventBus.this.cleanupConnection(holder.theServerID, holder, true);
                    }
                });
                new PingMessage(DefaultEventBus.this.serverID).write(holder.socket);
            }
        });
    }

    private void removeSub(String subName, ServerID theServerID, final AsyncResultHandler<Void> completionHandler) {
        this.subs.remove(subName, theServerID, new AsyncResultHandler<Boolean>(){

            @Override
            public void handle(AsyncResult<Boolean> event) {
                if (completionHandler != null) {
                    AsyncResult<Object> result = event.exception != null ? new AsyncResult(event.exception) : new AsyncResult<Void>((Void)null);
                    completionHandler.handle(result);
                } else if (event.exception != null) {
                    log.error("Failed to remove subscription", event.exception);
                }
            }
        });
    }

    private void receiveMessage(BaseMessage msg) {
        msg.bus = this;
        Handlers handlers = (Handlers)this.handlerMap.get(msg.address);
        if (handlers != null) {
            if (msg.send) {
                HandlerHolder holder = handlers.choose();
                if (holder != null) {
                    this.doReceive(msg, holder);
                }
            } else {
                for (HandlerHolder holder : handlers.list) {
                    this.doReceive(msg, holder);
                }
            }
        }
    }

    private void doReceive(final BaseMessage msg, final HandlerHolder holder) {
        final Message copied = msg.copy();
        holder.context.execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    if (!holder.removed) {
                        holder.handler.handle(copied);
                    }
                }
                finally {
                    if (holder.replyHandler) {
                        DefaultEventBus.this.unregisterHandler(msg.address, holder.handler);
                    }
                }
            }
        });
    }

    private class HandlerCloseHook
    implements Runnable {
        final Set<HandlerEntry> entries = new HashSet<HandlerEntry>();

        private HandlerCloseHook() {
        }

        @Override
        public void run() {
            for (HandlerEntry entry : new HashSet<HandlerEntry>(this.entries)) {
                DefaultEventBus.this.unregisterHandler(entry.address, entry.handler);
            }
        }
    }

    private class HandlerEntry {
        final String address;
        final Handler<? extends Message> handler;

        private HandlerEntry(String address, Handler<? extends Message> handler) {
            this.address = address;
            this.handler = handler;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (this.getClass() != o.getClass()) {
                return false;
            }
            HandlerEntry entry = (HandlerEntry)o;
            if (!this.address.equals(entry.address)) {
                return false;
            }
            return this.handler.equals(entry.handler);
        }

        public int hashCode() {
            int result = this.address != null ? this.address.hashCode() : 0;
            result = 31 * result + (this.handler != null ? this.handler.hashCode() : 0);
            return result;
        }
    }

    private static class Handlers {
        final List<HandlerHolder> list = new CopyOnWriteArrayList<HandlerHolder>();
        final AtomicInteger pos = new AtomicInteger(0);

        private Handlers() {
        }

        HandlerHolder choose() {
            int size;
            while ((size = this.list.size()) != 0) {
                int p = this.pos.getAndIncrement();
                if (p >= size - 1) {
                    this.pos.set(0);
                }
                try {
                    return this.list.get(p);
                }
                catch (IndexOutOfBoundsException e) {
                    this.pos.set(0);
                    continue;
                }
                break;
            }
            return null;
        }
    }

    private class ConnectionHolder {
        final NetClient client;
        volatile NetSocket socket;
        final Queue<BaseMessage> pending = new ConcurrentLinkedQueue<BaseMessage>();
        volatile boolean connected;
        long timeoutID = -1L;
        long pingTimeoutID = -1L;
        ServerID theServerID;

        private ConnectionHolder(NetClient client) {
            this.client = client;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void writeMessage(BaseMessage message) {
            if (this.connected) {
                message.write(this.socket);
            } else {
                ConnectionHolder connectionHolder = this;
                synchronized (connectionHolder) {
                    if (this.connected) {
                        message.write(this.socket);
                    } else {
                        this.pending.add(message);
                    }
                }
            }
        }

        synchronized void connected(final ServerID theServerID, NetSocket socket) {
            this.socket = socket;
            this.theServerID = theServerID;
            this.connected = true;
            socket.exceptionHandler(new Handler<Exception>(){

                @Override
                public void handle(Exception e) {
                    DefaultEventBus.this.cleanupConnection(theServerID, ConnectionHolder.this, true);
                }
            });
            socket.closedHandler(new SimpleHandler(){

                @Override
                public void handle() {
                    DefaultEventBus.this.cleanupConnection(theServerID, ConnectionHolder.this, false);
                }
            });
            socket.dataHandler(new Handler<Buffer>(){

                @Override
                public void handle(Buffer data) {
                    DefaultEventBus.this.vertx.cancelTimer(ConnectionHolder.this.timeoutID);
                    DefaultEventBus.this.schedulePing(ConnectionHolder.this);
                }
            });
            DefaultEventBus.this.schedulePing(this);
            for (BaseMessage message : this.pending) {
                message.write(socket);
            }
            this.pending.clear();
        }

        void connect(NetClient client, final ServerID theServerID) {
            client.connect(theServerID.port, theServerID.host, new Handler<NetSocket>(){

                @Override
                public void handle(NetSocket socket) {
                    ConnectionHolder.this.connected(theServerID, socket);
                }
            });
            client.exceptionHandler(new Handler<Exception>(){

                @Override
                public void handle(Exception e) {
                    DefaultEventBus.this.cleanupConnection(theServerID, ConnectionHolder.this, true);
                }
            });
        }
    }

    private static class HandlerHolder {
        final Context context;
        final Handler handler;
        final boolean replyHandler;
        final boolean localOnly;
        boolean removed;

        HandlerHolder(Handler handler, boolean replyHandler, boolean localOnly, Context context) {
            this.context = context;
            this.handler = handler;
            this.replyHandler = replyHandler;
            this.localOnly = localOnly;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            HandlerHolder that = (HandlerHolder)o;
            return this.handler.equals(that.handler);
        }

        public int hashCode() {
            return this.handler.hashCode();
        }
    }
}

