/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.http;

import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.base.ActorServer;
import org.nustaq.kontraktor.remoting.base.ActorServerConnector;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.http.HttpObjectSocket;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kontraktor.util.Pair;
import org.nustaq.serialization.FSTConfiguration;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;

public class UndertowHttpServerConnector
implements ActorServerConnector,
HttpHandler {
    public static int REQUEST_RESULTING_FUTURE_TIMEOUT = 3000;
    public static long SESSION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(30L);
    Actor facade;
    HashMap<String, HttpObjectSocket> sessions = new HashMap();
    FSTConfiguration conf = FSTConfiguration.createJsonConfiguration();
    Function<ObjectSocket, ObjectSink> factory;
    long sessionTimeout = SESSION_TIMEOUT_MS;
    volatile boolean isClosed = false;
    private ActorServer actorServer;

    public UndertowHttpServerConnector(Actor facade) {
        this.facade = facade;
        facade.delayed((long)(HttpObjectSocket.LP_TIMEOUT / 2), () -> this.houseKeeping());
    }

    public void houseKeeping() {
        long now = System.currentTimeMillis();
        ArrayList<String> toRemove = new ArrayList<String>(0);
        this.sessions.entrySet().forEach(entry -> {
            HttpObjectSocket socket = (HttpObjectSocket)entry.getValue();
            if (now - socket.getLongPollTaskTime() >= (long)(HttpObjectSocket.LP_TIMEOUT / 2)) {
                socket.triggerLongPoll();
            }
            if (now - socket.getLastUse() > this.getSessionTimeout()) {
                toRemove.add((String)entry.getKey());
            }
        });
        toRemove.forEach(sessionId -> this.closeSession((String)sessionId));
        if (!this.isClosed) {
            this.facade.delayed((long)(HttpObjectSocket.LP_TIMEOUT / 4), () -> this.houseKeeping());
        }
    }

    public void setSessionTimeout(long sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    public long getSessionTimeout() {
        return this.sessionTimeout;
    }

    public void handleRequest(HttpServerExchange exchange) throws Exception {
        if (exchange.getRequestMethod() != Methods.POST) {
            exchange.setResponseCode(404);
            exchange.endExchange();
            return;
        }
        String rpath = exchange.getRelativePath();
        StreamSourceChannel requestChannel = exchange.getRequestChannel();
        String first = exchange.getRequestHeaders().getFirst(Headers.CONTENT_LENGTH);
        int len = Integer.parseInt(first);
        ByteBuffer buf = ByteBuffer.allocate(len);
        requestChannel.getReadSetter().set(streamSourceChannel -> {
            try {
                streamSourceChannel.read(buf);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            if (buf.remaining() == 0) {
                try {
                    requestChannel.shutdownReads();
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
                this.facade.execute(() -> this.requestReceived(exchange, buf.array(), rpath));
            }
        });
        requestChannel.resumeReads();
    }

    protected void requestReceived(HttpServerExchange exchange, byte[] postData, String path) {
        while (path.startsWith("/")) {
            path = path.substring(1);
        }
        if (path.trim().length() > 0) {
            String[] split = path.split("/");
            HttpObjectSocket httpObjectSocket = this.sessions.get(split[0]);
            if (httpObjectSocket != null) {
                this.handleClientRequest(exchange, httpObjectSocket, postData, split.length > 1 ? split[1] : null);
            } else {
                exchange.setResponseCode(404);
                exchange.endExchange();
            }
        } else {
            Object auth = null;
            if (postData != null && postData.length > 0) {
                auth = this.conf.asObject(postData);
            }
            String sessionId = Long.toHexString((long)(Math.random() * 9.223372036854776E18));
            HttpObjectSocket sock = new HttpObjectSocket(sessionId, () -> this.facade.execute(() -> this.closeSession(sessionId))){

                @Override
                protected int getObjectMaxBatchSize() {
                    return HttpObjectSocket.HTTP_BATCH_SIZE;
                }
            };
            this.sessions.put(sock.getSessionId(), sock);
            ObjectSink sink = this.factory.apply(sock);
            sock.setSink(sink);
            byte[] response = this.conf.asByteArray((Object)sock.getSessionId());
            ByteBuffer responseBuf = ByteBuffer.wrap(response);
            exchange.setResponseCode(200);
            exchange.setResponseContentLength((long)response.length);
            StreamSinkChannel sinkchannel = exchange.getResponseChannel();
            sinkchannel.getWriteSetter().set(channel -> {
                if (responseBuf.remaining() > 0) {
                    try {
                        sinkchannel.write(responseBuf);
                        if (responseBuf.remaining() == 0) {
                            Log.Info((Object)this, (String)("client connected " + sessionId));
                            exchange.endExchange();
                        }
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                        exchange.endExchange();
                    }
                } else {
                    Log.Info((Object)this, (String)("client connected " + sessionId));
                    exchange.endExchange();
                }
            });
            sinkchannel.resumeWrites();
        }
    }

    protected HttpObjectSocket closeSession(String sessionId) {
        Log.Info((Object)this, (String)(sessionId + " closed"));
        HttpObjectSocket httpObjectSocket = this.sessions.get(sessionId);
        if (httpObjectSocket != null) {
            httpObjectSocket.sinkClosed();
        }
        return this.sessions.remove(sessionId);
    }

    public void handleClientRequest(HttpServerExchange exchange, HttpObjectSocket httpObjectSocket, byte[] postData, String lastSeenSequence) {
        byte[] msg;
        boolean isEmptyLP;
        StreamSinkChannel sinkchannel = exchange.getResponseChannel();
        if (sinkchannel == null) {
            Log.Error((Object)this, (String)"could not aquire response channel. rejecting request.");
            exchange.endExchange();
            return;
        }
        httpObjectSocket.updateTimeStamp();
        Object[] received = (Object[])httpObjectSocket.getConf().asObject(postData);
        boolean bl = isEmptyLP = received instanceof Object[] && received.length == 1 && received[0] instanceof Number;
        if (!isEmptyLP) {
            this.handleRegularRequest(exchange, httpObjectSocket, received, sinkchannel);
            return;
        }
        int lastClientSeq = -1;
        if (lastSeenSequence != null) {
            try {
                lastClientSeq = Integer.parseInt(lastSeenSequence);
            }
            catch (Throwable t) {
                Log.Warn((Object)this, (Throwable)t);
            }
        }
        if (lastClientSeq > 0 && (msg = (byte[])httpObjectSocket.takeStoredLPMessage(lastClientSeq + 1)) != null) {
            this.replyFromHistory(exchange, sinkchannel, msg);
            return;
        }
        sinkchannel.resumeWrites();
        Pair<Runnable, HttpServerExchange> lpTask = this.createLongPollTask(exchange, httpObjectSocket, sinkchannel);
        httpObjectSocket.cancelLongPoll();
        httpObjectSocket.setLongPollTask(lpTask);
    }

    protected Pair<Runnable, HttpServerExchange> createLongPollTask(HttpServerExchange exchange, HttpObjectSocket httpObjectSocket, StreamSinkChannel sinkchannel) {
        return new Pair(() -> {
            if (!sinkchannel.isOpen()) {
                return;
            }
            Pair<byte[], Integer> nextQueuedMessage = httpObjectSocket.getNextQueuedMessage();
            byte[] response = (byte[])nextQueuedMessage.getFirst();
            exchange.setResponseContentLength((long)response.length);
            if (response.length == 0) {
                exchange.endExchange();
            } else {
                httpObjectSocket.storeLPMessage((Integer)nextQueuedMessage.getSecond(), response);
                ByteBuffer responseBuf = ByteBuffer.wrap(response);
                try {
                    while (responseBuf.remaining() > 0) {
                        sinkchannel.write(responseBuf);
                    }
                }
                catch (Throwable e) {
                    System.out.println("buffer size:" + response.length);
                    try {
                        sinkchannel.close();
                    }
                    catch (IOException e1) {
                        e1.printStackTrace();
                    }
                    e.printStackTrace();
                }
                exchange.endExchange();
            }
        }, (Object)exchange);
    }

    protected void replyFromHistory(HttpServerExchange exchange, StreamSinkChannel sinkchannel, byte[] msg) {
        ByteBuffer responseBuf = ByteBuffer.wrap(msg);
        exchange.setResponseContentLength((long)msg.length);
        sinkchannel.getWriteSetter().set(channel -> {
            block5: {
                if (responseBuf.remaining() > 0) {
                    try {
                        sinkchannel.write(responseBuf);
                        if (responseBuf.remaining() == 0) {
                            exchange.endExchange();
                            break block5;
                        }
                        sinkchannel.resumeWrites();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        exchange.endExchange();
                    }
                } else {
                    exchange.endExchange();
                }
            }
        });
        sinkchannel.resumeWrites();
    }

    protected void handleRegularRequest(HttpServerExchange exchange, HttpObjectSocket httpObjectSocket, Object[] received, StreamSinkChannel sinkchannel) {
        ArrayList futures = new ArrayList();
        httpObjectSocket.getSink().receiveObject((Object)received, futures);
        Runnable reply = () -> {
            Pair<byte[], Integer> nextQueuedMessage = httpObjectSocket.getNextQueuedMessage();
            byte[] response = (byte[])nextQueuedMessage.getFirst();
            exchange.setResponseContentLength((long)response.length);
            if (response.length == 0) {
                exchange.endExchange();
            } else {
                httpObjectSocket.storeLPMessage((Integer)nextQueuedMessage.cdr(), response);
                long tim = System.nanoTime();
                ByteBuffer responseBuf = ByteBuffer.wrap(response);
                while (responseBuf.remaining() > 0) {
                    try {
                        sinkchannel.write(responseBuf);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                exchange.endExchange();
            }
        };
        if (futures == null || futures.size() == 0) {
            reply.run();
        } else {
            Actors.all(futures).timeoutIn((long)REQUEST_RESULTING_FUTURE_TIMEOUT).then(() -> reply.run()).onTimeout(() -> reply.run());
            sinkchannel.resumeWrites();
        }
    }

    public void connect(Actor facade, Function<ObjectSocket, ObjectSink> factory) throws Exception {
        this.facade = facade;
        this.factory = factory;
    }

    public IPromise closeServer() {
        this.isClosed = true;
        return new Promise(null);
    }

    public void setActorServer(ActorServer actorServer) {
        this.actorServer = actorServer;
    }

    public ActorServer getActorServer() {
        return this.actorServer;
    }
}

