/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.http;

import ch.squaredesk.nova.comm.http.HttpRequestMethod;
import ch.squaredesk.nova.comm.http.ReplyInfo;
import ch.squaredesk.nova.comm.http.RequestInfo;
import ch.squaredesk.nova.comm.http.RequestMessageMetaData;
import ch.squaredesk.nova.comm.http.RpcInvocation;
import ch.squaredesk.nova.comm.retrieving.IncomingMessage;
import ch.squaredesk.nova.comm.retrieving.IncomingMessageMetaData;
import ch.squaredesk.nova.comm.retrieving.MessageUnmarshaller;
import ch.squaredesk.nova.comm.sending.MessageMarshaller;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.glassfish.grizzly.ReadHandler;
import org.glassfish.grizzly.http.Method;
import org.glassfish.grizzly.http.io.NIOReader;
import org.glassfish.grizzly.http.io.NIOWriter;
import org.glassfish.grizzly.http.server.HttpHandler;
import org.glassfish.grizzly.http.server.HttpServer;
import org.glassfish.grizzly.http.server.Request;
import org.glassfish.grizzly.http.server.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcServer<InternalMessageType>
extends ch.squaredesk.nova.comm.rpc.RpcServer<String, RpcInvocation<InternalMessageType>> {
    private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
    private final MessageMarshaller<InternalMessageType, String> messageMarshaller;
    private final MessageUnmarshaller<String, InternalMessageType> messageUnmarshaller;
    private final Map<String, Flowable<RpcInvocation<? extends InternalMessageType>>> mapDestinationToIncomingMessages = new ConcurrentHashMap<String, Flowable<RpcInvocation<? extends InternalMessageType>>>();
    private final HttpServer httpServer;

    public RpcServer(HttpServer httpServer, MessageMarshaller<InternalMessageType, String> messageMarshaller, MessageUnmarshaller<String, InternalMessageType> messageUnmarshaller, Metrics metrics) {
        this(null, httpServer, messageMarshaller, messageUnmarshaller, metrics);
    }

    public RpcServer(String identifier, HttpServer httpServer, MessageMarshaller<InternalMessageType, String> messageMarshaller, MessageUnmarshaller<String, InternalMessageType> messageUnmarshaller, Metrics metrics) {
        super(identifier, metrics);
        Objects.requireNonNull(httpServer, "httpServer must not be null");
        Objects.requireNonNull(messageMarshaller, "messageMarshaller must not be null");
        Objects.requireNonNull(messageUnmarshaller, "messageUnmarshaller must not be null");
        this.httpServer = httpServer;
        this.messageUnmarshaller = messageUnmarshaller;
        this.messageMarshaller = messageMarshaller;
    }

    public Flowable<RpcInvocation<InternalMessageType>> requests(String destination) {
        URL destinationAsLocalUrl;
        try {
            destinationAsLocalUrl = new URL("http", "localhost", destination);
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
        Flowable retVal = this.mapDestinationToIncomingMessages.computeIfAbsent(destination, key -> {
            logger.info("Listening to requests on " + destination);
            PublishSubject stream = PublishSubject.create();
            stream = stream.toSerialized();
            NonBlockingHttpHandler httpHandler = new NonBlockingHttpHandler(destinationAsLocalUrl, (Subject)stream);
            this.httpServer.getServerConfiguration().addHttpHandler((HttpHandler)httpHandler, new String[]{destination});
            return stream.toFlowable(BackpressureStrategy.BUFFER).doFinally(() -> {
                this.mapDestinationToIncomingMessages.remove(destination);
                this.httpServer.getServerConfiguration().removeHttpHandler((HttpHandler)httpHandler);
                logger.info("Stopped listening to requests on " + destination);
            }).share();
        });
        return retVal;
    }

    private static RequestInfo httpSpecificInfoFrom(Request request) throws Exception {
        HashMap<String, String> parameters = new HashMap<String, String>();
        for (Map.Entry entry : request.getParameterMap().entrySet()) {
            String[] valueList = (String[])entry.getValue();
            String valueToPass = null;
            if (valueList != null && valueList.length > 0) {
                valueToPass = valueList[0];
            }
            parameters.put((String)entry.getKey(), valueToPass);
        }
        return new RequestInfo(RpcServer.convert(request.getMethod()), parameters);
    }

    private static HttpRequestMethod convert(Method method) {
        if (method == Method.POST) {
            return HttpRequestMethod.POST;
        }
        if (method == Method.DELETE) {
            return HttpRequestMethod.DELETE;
        }
        if (method == Method.PUT) {
            return HttpRequestMethod.PUT;
        }
        return HttpRequestMethod.GET;
    }

    private static <T> T convertRequestData(String objectAsString, MessageUnmarshaller<String, T> unmarshaller) throws Exception {
        return (T)unmarshaller.unmarshal((Object)objectAsString);
    }

    private static <T> String convertResponseData(T replyObject, MessageMarshaller<T, String> marshaller) throws Exception {
        return (String)marshaller.marshal(replyObject);
    }

    private static void writeResponse(String reply, NIOWriter out) throws Exception {
        out.write(reply);
        out.flush();
        out.close();
    }

    private static char[] appendAvailableDataToBuffer(NIOReader in, char[] currentBuffer) throws IOException {
        char[] readBuffer = new char[in.readyData()];
        int numRead = in.read(readBuffer);
        if (numRead <= 0) {
            return currentBuffer;
        }
        char[] retVal = new char[currentBuffer.length + numRead];
        System.arraycopy(currentBuffer, 0, retVal, 0, currentBuffer.length);
        System.arraycopy(readBuffer, 0, retVal, currentBuffer.length, numRead);
        return retVal;
    }

    void start() throws IOException {
        this.httpServer.start();
    }

    void shutdown() {
        try {
            this.httpServer.shutdown(2L, TimeUnit.SECONDS).get();
        }
        catch (Exception e) {
            logger.info("An error occurred, trying to shutdown REST HTTP server", (Throwable)e);
        }
    }

    private class NonBlockingHttpHandler
    extends HttpHandler {
        private final URL destination;
        private final Subject<RpcInvocation<? extends InternalMessageType>> stream;

        private NonBlockingHttpHandler(URL destination, Subject<RpcInvocation<? extends InternalMessageType>> stream) {
            this.destination = destination;
            this.stream = stream;
        }

        public void service(final Request request, final Response response) throws Exception {
            response.suspend();
            final NIOReader in = request.getNIOReader();
            in.notifyAvailable(new ReadHandler(){
                private char[] inputBuffer = new char[0];

                public void onDataAvailable() throws Exception {
                    this.inputBuffer = RpcServer.appendAvailableDataToBuffer(in, this.inputBuffer);
                    in.notifyAvailable((ReadHandler)this);
                }

                public void onError(Throwable t) {
                    logger.error("Error parsing request data", t);
                    response.setStatus(400, "Bad request");
                    response.resume();
                }

                public void onAllDataRead() throws Exception {
                    this.inputBuffer = RpcServer.appendAvailableDataToBuffer(in, this.inputBuffer);
                    String requestAsString = new String(this.inputBuffer);
                    try {
                        in.close();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    Object requestObject = requestAsString.trim().isEmpty() ? null : RpcServer.convertRequestData(requestAsString, RpcServer.this.messageUnmarshaller);
                    RequestInfo requestInfo = RpcServer.httpSpecificInfoFrom(request);
                    RequestMessageMetaData metaData = new RequestMessageMetaData(NonBlockingHttpHandler.this.destination, requestInfo);
                    RpcInvocation rpci = new RpcInvocation(new IncomingMessage(requestObject, (IncomingMessageMetaData)metaData), replyInfo -> {
                        try (NIOWriter out = response.getNIOWriter();){
                            String responseAsString = RpcServer.convertResponseData(replyInfo._1, RpcServer.this.messageMarshaller);
                            response.setContentType("application/json");
                            response.setContentLength(responseAsString.length());
                            int statusCode = replyInfo._2 == null ? 200 : ((ReplyInfo)replyInfo._2).statusCode;
                            response.setStatus(statusCode);
                            RpcServer.writeResponse(responseAsString, out);
                            RpcServer.this.metricsCollector.requestCompleted(requestObject, (Object)responseAsString);
                        }
                        catch (Exception e) {
                            RpcServer.this.metricsCollector.requestCompletedExceptionally(requestObject, (Throwable)e);
                            logger.error("An error occurred trying to send HTTP response " + replyInfo, (Throwable)e);
                            try {
                                response.sendError(500, "Internal server error");
                            }
                            catch (Exception any) {
                                logger.error("Failed to send error 500 back to client", (Throwable)any);
                            }
                        }
                        finally {
                            response.resume();
                        }
                    }, error -> {
                        logger.error("An error occurred trying to process HTTP request " + requestAsString, error);
                        try {
                            response.sendError(500, "Internal server error");
                        }
                        catch (Exception any) {
                            logger.error("Failed to send error 500 back to client", (Throwable)any);
                        }
                    });
                    RpcServer.this.metricsCollector.requestReceived((Object)rpci.request);
                    NonBlockingHttpHandler.this.stream.onNext(rpci);
                }
            });
        }
    }
}

