/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.http;

import ch.squaredesk.nova.comm.DefaultMessageTranscriberForStringAsTransportType;
import ch.squaredesk.nova.comm.MessageTranscriber;
import ch.squaredesk.nova.comm.http.HttpRequestMethod;
import ch.squaredesk.nova.comm.http.ReplyInfo;
import ch.squaredesk.nova.comm.http.RequestInfo;
import ch.squaredesk.nova.comm.http.RequestMessageMetaData;
import ch.squaredesk.nova.comm.http.RpcInvocation;
import ch.squaredesk.nova.comm.retrieving.IncomingMessage;
import ch.squaredesk.nova.comm.retrieving.IncomingMessageMetaData;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.Writer;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.glassfish.grizzly.ReadHandler;
import org.glassfish.grizzly.http.Method;
import org.glassfish.grizzly.http.io.NIOReader;
import org.glassfish.grizzly.http.io.NIOWriter;
import org.glassfish.grizzly.http.server.HttpHandler;
import org.glassfish.grizzly.http.server.HttpServer;
import org.glassfish.grizzly.http.server.Request;
import org.glassfish.grizzly.http.server.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcServer
extends ch.squaredesk.nova.comm.rpc.RpcServer<String, String> {
    private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
    private final Map<String, Flowable<RpcInvocation>> mapDestinationToIncomingMessages = new ConcurrentHashMap<String, Flowable<RpcInvocation>>();
    private final HttpServer httpServer;

    public RpcServer(HttpServer httpServer, MessageTranscriber<String> messageTranscriber, Metrics metrics) {
        this(null, httpServer, messageTranscriber, metrics);
    }

    public RpcServer(HttpServer httpServer, Metrics metrics) {
        this(null, httpServer, (MessageTranscriber<String>)new DefaultMessageTranscriberForStringAsTransportType(), metrics);
    }

    public RpcServer(String identifier, HttpServer httpServer, Metrics metrics) {
        this(identifier, httpServer, (MessageTranscriber<String>)new DefaultMessageTranscriberForStringAsTransportType(), metrics);
    }

    public RpcServer(String identifier, HttpServer httpServer, MessageTranscriber<String> messageTranscriber, Metrics metrics) {
        super(identifier, messageTranscriber, metrics);
        Objects.requireNonNull(httpServer, "httpServer must not be null");
        this.httpServer = httpServer;
    }

    public <T> Flowable<RpcInvocation<T>> requests(String destination, Class<T> targetType) {
        URL destinationAsLocalUrl;
        try {
            destinationAsLocalUrl = new URL("http", "localhost", destination);
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
        Flowable retVal = this.mapDestinationToIncomingMessages.computeIfAbsent(destination, key -> {
            logger.info("Listening to requests on " + destination);
            PublishSubject stream = PublishSubject.create();
            stream = stream.toSerialized();
            NonBlockingHttpHandler httpHandler = new NonBlockingHttpHandler(destinationAsLocalUrl, this.messageTranscriber, targetType, (Subject)stream);
            this.httpServer.getServerConfiguration().addHttpHandler(httpHandler, new String[]{destination});
            return stream.toFlowable(BackpressureStrategy.BUFFER).doFinally(() -> {
                this.mapDestinationToIncomingMessages.remove(destination);
                this.httpServer.getServerConfiguration().removeHttpHandler((HttpHandler)httpHandler);
                logger.info("Stopped listening to requests on " + destination);
            }).share();
        });
        return retVal;
    }

    private static RequestInfo httpSpecificInfoFrom(Request request) throws Exception {
        HashMap<String, String> parameters = new HashMap<String, String>();
        for (Map.Entry entry : request.getParameterMap().entrySet()) {
            String[] valueList = (String[])entry.getValue();
            String valueToPass = null;
            if (valueList != null && valueList.length > 0) {
                valueToPass = valueList[0];
            }
            parameters.put((String)entry.getKey(), valueToPass);
        }
        return new RequestInfo(RpcServer.convert(request.getMethod()), parameters);
    }

    private static HttpRequestMethod convert(Method method) {
        if (method == Method.CONNECT) {
            return HttpRequestMethod.CONNECT;
        }
        if (method == Method.DELETE) {
            return HttpRequestMethod.DELETE;
        }
        if (method == Method.GET) {
            return HttpRequestMethod.GET;
        }
        if (method == Method.HEAD) {
            return HttpRequestMethod.HEAD;
        }
        if (method == Method.OPTIONS) {
            return HttpRequestMethod.OPTIONS;
        }
        if (method == Method.PATCH) {
            return HttpRequestMethod.PATCH;
        }
        if (method == Method.PRI) {
            return HttpRequestMethod.PRI;
        }
        if (method == Method.POST) {
            return HttpRequestMethod.POST;
        }
        if (method == Method.PUT) {
            return HttpRequestMethod.PUT;
        }
        if (method == Method.TRACE) {
            return HttpRequestMethod.TRACE;
        }
        throw new IllegalArgumentException("Unsupported HTTP method " + method);
    }

    private static void writeResponse(String reply, NIOWriter out) throws Exception {
        try (BufferedWriter writer = new BufferedWriter((Writer)out);){
            writer.write(reply);
        }
    }

    private static char[] appendAvailableDataToBuffer(NIOReader in, char[] currentBuffer) throws IOException {
        char[] readBuffer = new char[in.readyData()];
        int numRead = in.read(readBuffer);
        if (numRead <= 0) {
            return currentBuffer;
        }
        char[] retVal = new char[currentBuffer.length + numRead];
        System.arraycopy(currentBuffer, 0, retVal, 0, currentBuffer.length);
        System.arraycopy(readBuffer, 0, retVal, currentBuffer.length, numRead);
        return retVal;
    }

    void start() throws IOException {
        this.httpServer.start();
    }

    void shutdown() {
        try {
            this.httpServer.shutdown(2L, TimeUnit.SECONDS).get();
        }
        catch (Exception e) {
            logger.info("An error occurred, trying to shutdown REST HTTP server", (Throwable)e);
        }
    }

    private class NonBlockingHttpHandler<IncomingMessageType>
    extends HttpHandler {
        private final URL destination;
        private final MessageTranscriber<String> messageTranscriber;
        private final Class<IncomingMessageType> targetType;
        private final Subject<RpcInvocation> stream;

        private NonBlockingHttpHandler(URL destination, MessageTranscriber<String> messageTranscriber, Class<IncomingMessageType> targetType, Subject<RpcInvocation> stream) {
            this.destination = destination;
            this.messageTranscriber = messageTranscriber;
            this.targetType = targetType;
            this.stream = stream;
        }

        public void service(final Request request, final Response response) throws Exception {
            response.suspend();
            final NIOReader in = request.getNIOReader();
            in.notifyAvailable(new ReadHandler(){
                private char[] inputBuffer = new char[0];

                public void onDataAvailable() throws Exception {
                    this.inputBuffer = RpcServer.appendAvailableDataToBuffer(in, this.inputBuffer);
                    in.notifyAvailable((ReadHandler)this);
                }

                public void onError(Throwable t) {
                    logger.error("Error parsing request data", t);
                    response.setStatus(400, "Bad request");
                    response.resume();
                }

                public void onAllDataRead() throws Exception {
                    this.inputBuffer = RpcServer.appendAvailableDataToBuffer(in, this.inputBuffer);
                    String incomingMessageAsString = new String(this.inputBuffer);
                    try {
                        in.close();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    Object incomingMessage = incomingMessageAsString.trim().isEmpty() ? null : NonBlockingHttpHandler.this.messageTranscriber.getIncomingMessageTranscriber(NonBlockingHttpHandler.this.targetType).apply((Object)incomingMessageAsString);
                    RequestInfo requestInfo = RpcServer.httpSpecificInfoFrom(request);
                    RequestMessageMetaData metaData = new RequestMessageMetaData(NonBlockingHttpHandler.this.destination, requestInfo);
                    RpcInvocation rpci = new RpcInvocation(new IncomingMessage(incomingMessage, (IncomingMessageMetaData)metaData), replyInfo -> {
                        response.setCharacterEncoding("utf-8");
                        try (NIOWriter out = response.getNIOWriter();){
                            int statusCode;
                            response.setContentType("application/json");
                            response.setContentLength(((String)replyInfo._1).length());
                            if (replyInfo._2 == null) {
                                statusCode = 200;
                            } else {
                                ((ReplyInfo)replyInfo._2).headerParams.entrySet().forEach(entry -> response.setHeader((String)entry.getKey(), (String)entry.getValue()));
                                statusCode = ((ReplyInfo)replyInfo._2).statusCode;
                            }
                            response.setStatus(statusCode);
                            RpcServer.writeResponse((String)replyInfo._1, out);
                            RpcServer.this.metricsCollector.requestCompleted(incomingMessage, replyInfo._1);
                        }
                        catch (Exception e) {
                            RpcServer.this.metricsCollector.requestCompletedExceptionally(incomingMessage, (Throwable)e);
                            logger.error("An error occurred trying to send HTTP response " + replyInfo, (Throwable)e);
                            try {
                                response.sendError(500, "Internal server error");
                            }
                            catch (Exception any) {
                                logger.error("Failed to send error 500 back to client", (Throwable)any);
                            }
                        }
                        finally {
                            response.resume();
                        }
                    }, error -> {
                        logger.error("An error occurred trying to process HTTP request " + incomingMessageAsString, error);
                        try {
                            response.sendError(500, "Internal server error");
                        }
                        catch (Exception any) {
                            logger.error("Failed to send error 500 back to client", (Throwable)any);
                        }
                    }, (MessageTranscriber<String>)NonBlockingHttpHandler.this.messageTranscriber);
                    RpcServer.this.metricsCollector.requestReceived((Object)rpci.request);
                    NonBlockingHttpHandler.this.stream.onNext(rpci);
                }
            });
        }
    }
}

