/*
 * Decompiled with CFR 0.152.
 */
package ch.turic.builtins.functions;

import ch.turic.Context;
import ch.turic.ExecutionException;
import ch.turic.TuriFunction;
import ch.turic.builtins.functions.FunUtils;
import ch.turic.commands.Closure;
import ch.turic.commands.operators.Cast;
import ch.turic.memory.BlockingQueueChannel;
import ch.turic.memory.Channel;
import ch.turic.memory.ChannelIterator;
import ch.turic.memory.LngException;
import ch.turic.memory.LngList;
import ch.turic.memory.LngObject;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

public class TuriHttpServer
implements TuriFunction {
    @Override
    public String name() {
        return "server";
    }

    @Override
    public Object call(Context context, Object[] args) throws ExecutionException {
        HttpServer server;
        ch.turic.memory.Context ctx = FunUtils.ctx(context);
        FunUtils.oneArg(this.name(), args);
        Object object = args[0];
        if (!(object instanceof LngObject)) {
            throw new ExecutionException("%s needs an object as configuration argument", this.name());
        }
        LngObject conf = (LngObject)object;
        int port = Cast.toLong(conf.getField("port")).intValue();
        String host = conf.getField("host").toString();
        try {
            server = HttpServer.create(new InetSocketAddress(host, port), 0);
        }
        catch (Exception e) {
            throw new ExecutionException(e);
        }
        int concurrency = Cast.toLong(Objects.requireNonNullElse(conf.getField("concurrency"), 10)).intValue();
        LimitedVirtualThreadExecutor executor = new LimitedVirtualThreadExecutor(concurrency);
        ChannelIterator channel = (ChannelIterator)new BlockingQueueChannel(concurrency).iterator();
        Object routes = conf.getField("routes");
        if (!(routes instanceof Iterable)) {
            throw new ExecutionException("%s needs a list as routes as configuration argument", this.name());
        }
        Iterable r = (Iterable)routes;
        for (Object rr : r) {
            if (!(rr instanceof LngObject)) {
                throw new ExecutionException("%s got '%s' as route, this is not an object", this.name(), rr);
            }
            LngObject route = (LngObject)rr;
            String path = route.getField("path").toString();
            Object handler = route.getField("handler");
            if (!(handler instanceof Closure)) {
                throw new ExecutionException("Handler is '%s' not a closure or function", handler);
            }
            Closure closure = (Closure)handler;
            server.createContext(path, exchange -> {
                ch.turic.memory.Context handlerCtx = ctx.thread();
                LngObject request = this.mapRequest(exchange, handlerCtx);
                LngObject requestMessage = this.mapRequest(exchange, handlerCtx);
                LngObject response = this.mapResponse(exchange, handlerCtx);
                try {
                    channel.send(Channel.Message.of(requestMessage));
                    String result = closure.call(handlerCtx.open(), request, response).toString();
                    exchange.sendResponseHeaders(Cast.toLong(response.getField("code")).intValue(), result.length());
                    try (OutputStream out = exchange.getResponseBody();){
                        out.write(result.getBytes(StandardCharsets.UTF_8));
                        out.flush();
                    }
                }
                catch (Exception e) {
                    Channel.Message<LngException> exception = Channel.Message.exception(LngException.build(handlerCtx, e, handlerCtx.threadContext.getStackTrace()));
                    channel.send(exception);
                }
            });
        }
        server.setExecutor(executor);
        server.start();
        return channel;
    }

    private LngObject mapResponse(HttpExchange exchange, ch.turic.memory.Context ctx) {
        LngObject response = LngObject.newEmpty(ctx);
        response.setField("headers", exchange.getResponseHeaders());
        response.setField("code", exchange.getResponseCode());
        return response;
    }

    private LngObject mapRequest(HttpExchange exchange, ch.turic.memory.Context ctx) {
        LngObject request = LngObject.newEmpty(ctx);
        request.setField("method", exchange.getRequestMethod());
        LngObject client = LngObject.newEmpty(ctx);
        client.setField("host", exchange.getRemoteAddress().getAddress().getHostAddress());
        client.setField("port", exchange.getRemoteAddress().getPort());
        request.setField("client", client);
        request.setField("protocol", exchange.getProtocol());
        LngObject srv = LngObject.newEmpty(ctx);
        srv.setField("host", exchange.getLocalAddress().getAddress().getHostAddress());
        srv.setField("port", exchange.getLocalAddress().getPort());
        request.setField("server", srv);
        LngObject headers = LngObject.newEmpty(ctx);
        for (Map.Entry<String, List<String>> h : exchange.getRequestHeaders().entrySet()) {
            LngList headerValues = new LngList();
            headerValues.array.addAll((Collection<Object>)h.getValue());
            headers.setField(h.getKey(), headerValues);
        }
        request.setField("headers", headers);
        request.setField("uri", exchange.getRequestURI().toString());
        request.setField("body", exchange.getRequestBody());
        return request;
    }

    static class LimitedVirtualThreadExecutor
    implements Executor {
        private final Semaphore semaphore;

        public LimitedVirtualThreadExecutor(int maxConcurrent) {
            this.semaphore = new Semaphore(maxConcurrent);
        }

        @Override
        public void execute(Runnable command) {
            try {
                this.semaphore.acquire();
                Thread.startVirtualThread(() -> {
                    try {
                        command.run();
                    }
                    finally {
                        this.semaphore.release();
                    }
                });
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RejectedExecutionException("Interrupted while acquiring semaphore", e);
            }
        }
    }
}

