/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.ext.mcp.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpStreamableServerSession;
import io.modelcontextprotocol.spec.McpStreamableServerTransport;
import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.mcp.transport.KeepAliveService;
import io.vertx.ext.mcp.transport.VertxMcpTransport;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class VertxMcpStreamableServerTransportProvider
implements McpStreamableServerTransportProvider,
VertxMcpTransport {
    private static final Logger log = LoggerFactory.getLogger(VertxMcpStreamableServerTransportProvider.class);
    public static final String MESSAGE_EVENT_TYPE = "message";
    private final ObjectMapper objectMapper;
    private final String mcpEndpoint;
    private final boolean disallowDelete;
    private final Vertx vertx;
    private final KeepAliveService keepAliveService;
    private McpStreamableServerSession.Factory sessionFactory;
    private final Map<String, McpStreamableServerSession> sessions = new ConcurrentHashMap<String, McpStreamableServerSession>();
    private volatile boolean isClosing = false;
    private Router router;

    public VertxMcpStreamableServerTransportProvider(ObjectMapper objectMapper, String mcpEndpoint, boolean disallowDelete, Vertx vertx, Duration keepAliveInterval) {
        this.objectMapper = objectMapper;
        this.mcpEndpoint = mcpEndpoint != null ? mcpEndpoint : "/mcp";
        this.disallowDelete = disallowDelete;
        this.vertx = vertx;
        this.keepAliveService = new KeepAliveService(vertx, keepAliveInterval);
    }

    private void initializeRouter() {
        this.router = Router.router((Vertx)this.vertx);
        this.router.route().handler((Handler)CorsHandler.create().addOriginWithRegex(".*").allowedHeaders(Set.of("Content-Type", "Authorization", "X-Requested-With", "Accept", "MCP-Session-Id", "MCP-Protocol-Version", "Last-Event-ID")).allowedMethods(Set.of(HttpMethod.GET, HttpMethod.POST, HttpMethod.DELETE, HttpMethod.OPTIONS)));
        this.router.get(this.mcpEndpoint).produces("text/event-stream").handler(this::handleGet);
        this.router.post(this.mcpEndpoint).consumes("application/json").handler((Handler)BodyHandler.create()).handler(this::handlePost);
        this.router.delete(this.mcpEndpoint).handler(this::handleDelete);
        this.router.get("/health").handler(ctx -> ctx.response().putHeader("Content-Type", "application/json").end(new JsonObject().put("status", (Object)"ok").encode()));
        this.keepAliveService.start(v -> this.sendKeepAlivePing());
    }

    @Override
    public Router getRouter() {
        if (this.router == null) {
            this.initializeRouter();
        }
        return this.router;
    }

    public List<String> protocolVersions() {
        return List.of("2024-11-05", "2025-03-26");
    }

    public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }

    public Mono<Void> notifyClients(String method, Object params) {
        if (this.sessions.isEmpty()) {
            log.debug("No active sessions to broadcast message to");
            return Mono.empty();
        }
        log.debug("Attempting to broadcast message to {} active sessions", (Object)this.sessions.size());
        return Flux.fromIterable(this.sessions.values()).flatMap(session -> session.sendNotification(method, params).doOnError(e -> log.error("Failed to send message to session {}: {}", (Object)session.getId(), (Object)e.getMessage())).onErrorComplete()).then();
    }

    @Override
    public Mono<Void> closeGracefully() {
        return Mono.defer(() -> {
            log.info("Initiating graceful shutdown with {} active sessions", (Object)this.sessions.size());
            this.isClosing = true;
            this.keepAliveService.stop();
            return Flux.fromIterable(this.sessions.values()).flatMap(McpStreamableServerSession::closeGracefully).then().doOnSuccess(v -> {
                log.debug("Graceful shutdown completed");
                this.sessions.clear();
            });
        });
    }

    private void sendKeepAlivePing() {
        if (this.sessions.isEmpty()) {
            return;
        }
        Flux.fromIterable(this.sessions.values()).flatMap(session -> {
            TypeReference<Object> typeRef = new TypeReference<Object>(this){};
            return session.sendRequest("ping", null, (TypeReference)typeRef).doOnError(e -> log.warn("Failed to send keep-alive ping to session {}: {}", (Object)session.getId(), (Object)e.getMessage())).onErrorComplete();
        }).subscribe(result -> {}, error -> log.error("Error during keep-alive ping", error));
    }

    private void handleGet(RoutingContext ctx) {
        if (this.isClosing) {
            ctx.response().setStatusCode(503).putHeader("Content-Type", "text/plain").end("Server is shutting down");
            return;
        }
        String acceptHeader = ctx.request().getHeader("Accept");
        if (acceptHeader == null || !acceptHeader.contains("text/event-stream")) {
            ctx.response().setStatusCode(405).putHeader("Content-Type", "text/plain").end("Method Not Allowed - SSE stream not supported");
            return;
        }
        String sessionId = ctx.request().getHeader("MCP-Session-Id");
        if (sessionId == null) {
            ctx.response().setStatusCode(400).putHeader("Content-Type", "text/plain").end("Session ID required");
            return;
        }
        McpStreamableServerSession session = this.sessions.get(sessionId);
        if (session == null) {
            ctx.response().setStatusCode(404).putHeader("Content-Type", "text/plain").end("Session not found");
            return;
        }
        String lastEventId = ctx.request().getHeader("Last-Event-ID");
        if (lastEventId != null) {
            this.handleStreamResumption(ctx, session, lastEventId);
            return;
        }
        ctx.response().putHeader("Content-Type", "text/event-stream").putHeader("Cache-Control", "no-cache").putHeader("Connection", "keep-alive").setChunked(true);
        VertxMcpStreamableSessionTransport sessionTransport = new VertxMcpStreamableSessionTransport(ctx.response());
        session.listeningStream((McpStreamableServerTransport)sessionTransport);
    }

    private void handlePost(RoutingContext ctx) {
        if (this.isClosing) {
            ctx.response().setStatusCode(503).putHeader("Content-Type", "text/plain").end("Server is shutting down");
            return;
        }
        String acceptHeader = ctx.request().getHeader("Accept");
        if (acceptHeader == null || !acceptHeader.contains("application/json") || !acceptHeader.contains("text/event-stream")) {
            ctx.response().setStatusCode(400).putHeader("Content-Type", "text/plain").end("Accept header must include application/json and text/event-stream");
            return;
        }
        try {
            String body = ctx.body().asString();
            log.debug("Received POST message: {}", (Object)body);
            McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage((ObjectMapper)this.objectMapper, (String)body);
            if (this.isInitializeRequest(message)) {
                this.handleInitializeRequest(ctx, message);
            } else {
                this.handleRegularMessage(ctx, message);
            }
        }
        catch (Exception e) {
            log.error("Failed to process POST message", (Throwable)e);
            ctx.response().setStatusCode(400).putHeader("Content-Type", "application/json").end(new JsonObject().put("error", (Object)"Invalid message format").encode());
        }
    }

    private void handleDelete(RoutingContext ctx) {
        if (this.isClosing) {
            ctx.response().setStatusCode(503).putHeader("Content-Type", "text/plain").end("Server is shutting down");
            return;
        }
        if (this.disallowDelete) {
            ctx.response().setStatusCode(405).putHeader("Content-Type", "text/plain").end("Method Not Allowed");
            return;
        }
        String sessionId = ctx.request().getHeader("MCP-Session-Id");
        if (sessionId == null) {
            ctx.response().setStatusCode(400).putHeader("Content-Type", "text/plain").end("Session ID required");
            return;
        }
        McpStreamableServerSession session = this.sessions.get(sessionId);
        if (session == null) {
            ctx.response().setStatusCode(404).putHeader("Content-Type", "text/plain").end("Session not found");
            return;
        }
        session.delete().subscribe(v -> {
            this.sessions.remove(sessionId);
            ctx.response().setStatusCode(200).putHeader("Content-Type", "text/plain").end("Session terminated");
        }, error -> {
            log.error("Error terminating session {}", (Object)sessionId, error);
            ctx.response().setStatusCode(500).putHeader("Content-Type", "text/plain").end("Error terminating session");
        });
    }

    private void handleInitializeRequest(RoutingContext ctx, Object message) {
        try {
            if (this.sessionFactory == null) {
                ctx.response().setStatusCode(500).putHeader("Content-Type", "application/json").end(new JsonObject().put("error", (Object)"Session factory not configured").encode());
                return;
            }
            McpStreamableServerSession.McpStreamableServerSessionInit init = this.sessionFactory.startSession((McpSchema.InitializeRequest)message);
            McpStreamableServerSession session = init.session();
            this.sessions.put(session.getId(), session);
            init.initResult().subscribe(initResult -> {
                try {
                    ctx.response().putHeader("Content-Type", "application/json").putHeader("MCP-Session-Id", session.getId()).end(this.objectMapper.writeValueAsString(initResult));
                }
                catch (Exception e) {
                    log.error("Failed to serialize init result", (Throwable)e);
                    ctx.response().setStatusCode(500).putHeader("Content-Type", "application/json").end(new JsonObject().put("error", (Object)"Failed to serialize response").encode());
                }
            }, error -> {
                log.error("Failed to get init result", error);
                ctx.response().setStatusCode(500).putHeader("Content-Type", "application/json").end(new JsonObject().put("error", (Object)"Initialization failed").encode());
            });
        }
        catch (Exception e) {
            log.error("Failed to handle initialization", (Throwable)e);
            ctx.response().setStatusCode(500).putHeader("Content-Type", "application/json").end(new JsonObject().put("error", (Object)"Initialization failed").encode());
        }
    }

    private void handleRegularMessage(RoutingContext ctx, Object message) {
        String sessionId = ctx.request().getHeader("MCP-Session-Id");
        if (sessionId == null) {
            ctx.response().setStatusCode(400).putHeader("Content-Type", "text/plain").end("Session ID required");
            return;
        }
        McpStreamableServerSession session = this.sessions.get(sessionId);
        if (session == null) {
            ctx.response().setStatusCode(404).putHeader("Content-Type", "text/plain").end("Session not found");
            return;
        }
        if (this.isRequestMessage(message)) {
            ctx.response().putHeader("Content-Type", "text/event-stream").putHeader("Cache-Control", "no-cache").putHeader("Connection", "keep-alive");
            VertxMcpStreamableSessionTransport sessionTransport = new VertxMcpStreamableSessionTransport(ctx.response());
            session.responseStream((McpSchema.JSONRPCRequest)message, (McpStreamableServerTransport)sessionTransport);
        } else {
            session.accept((McpSchema.JSONRPCNotification)message).subscribe(v -> ctx.response().setStatusCode(202).end(), error -> {
                log.error("Error accepting message", error);
                ctx.response().setStatusCode(500).putHeader("Content-Type", "text/plain").end("Error processing message");
            });
        }
    }

    private void handleStreamResumption(RoutingContext ctx, McpStreamableServerSession session, String lastEventId) {
        ctx.response().putHeader("Content-Type", "text/event-stream").putHeader("Cache-Control", "no-cache").putHeader("Connection", "keep-alive");
        ctx.response().setChunked(true);
        session.replay((Object)lastEventId).flatMap(event -> {
            try {
                String eventData = this.objectMapper.writeValueAsString(event);
                return Mono.fromCompletionStage((CompletionStage)this.sendSseEvent(ctx.response(), MESSAGE_EVENT_TYPE, eventData).toCompletionStage());
            }
            catch (Exception e) {
                log.error("Failed to serialize replayed event", (Throwable)e);
                return Mono.error((Throwable)e);
            }
        }).doOnError(error -> {
            log.error("Error during stream resumption", error);
            if (!ctx.response().closed()) {
                ctx.response().end();
            }
        }).doOnComplete(() -> {
            if (!ctx.response().closed()) {
                ctx.response().end();
            }
        }).subscribe();
    }

    private boolean isInitializeRequest(Object message) {
        try {
            if (message instanceof McpSchema.JSONRPCRequest) {
                McpSchema.JSONRPCRequest request = (McpSchema.JSONRPCRequest)message;
                return "initialize".equals(request.method());
            }
            return false;
        }
        catch (Exception e) {
            return false;
        }
    }

    private boolean isRequestMessage(Object message) {
        try {
            return message instanceof McpSchema.JSONRPCRequest;
        }
        catch (Exception e) {
            return false;
        }
    }

    private Future<Void> sendSseEvent(HttpServerResponse response, String eventType, String data) {
        return response.write("event: " + eventType + "\ndata: " + data + "\n\n");
    }

    @Generated
    public static VertxMcpStreamableServerTransportProviderBuilder builder() {
        return new VertxMcpStreamableServerTransportProviderBuilder();
    }

    private class VertxMcpStreamableSessionTransport
    implements McpStreamableServerTransport {
        private final HttpServerResponse response;

        public VertxMcpStreamableSessionTransport(HttpServerResponse response) {
            this.response = response;
        }

        public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
            return this.sendMessage(message, null);
        }

        public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message, String messageId) {
            return Mono.fromSupplier(() -> {
                try {
                    return VertxMcpStreamableServerTransportProvider.this.objectMapper.writeValueAsString((Object)message);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).flatMap(jsonText -> {
                String eventData = "id: " + (messageId != null ? messageId : "") + "\nevent: message\ndata: " + jsonText + "\n\n";
                return Mono.fromCompletionStage((CompletionStage)this.response.write(eventData).toCompletionStage());
            }).doOnError(e -> {
                log.error("Failed to send message", e);
                if (!this.response.closed()) {
                    this.response.end();
                }
            });
        }

        public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
            return (T)VertxMcpStreamableServerTransportProvider.this.objectMapper.convertValue(data, typeRef);
        }

        public Mono<Void> closeGracefully() {
            return Mono.fromRunnable(() -> {
                if (!this.response.closed()) {
                    this.response.end();
                }
            });
        }

        public void close() {
            if (!this.response.closed()) {
                this.response.end();
            }
        }
    }

    @Generated
    public static class VertxMcpStreamableServerTransportProviderBuilder {
        @Generated
        private ObjectMapper objectMapper;
        @Generated
        private String mcpEndpoint;
        @Generated
        private boolean disallowDelete;
        @Generated
        private Vertx vertx;
        @Generated
        private Duration keepAliveInterval;

        @Generated
        VertxMcpStreamableServerTransportProviderBuilder() {
        }

        @Generated
        public VertxMcpStreamableServerTransportProviderBuilder objectMapper(ObjectMapper objectMapper) {
            this.objectMapper = objectMapper;
            return this;
        }

        @Generated
        public VertxMcpStreamableServerTransportProviderBuilder mcpEndpoint(String mcpEndpoint) {
            this.mcpEndpoint = mcpEndpoint;
            return this;
        }

        @Generated
        public VertxMcpStreamableServerTransportProviderBuilder disallowDelete(boolean disallowDelete) {
            this.disallowDelete = disallowDelete;
            return this;
        }

        @Generated
        public VertxMcpStreamableServerTransportProviderBuilder vertx(Vertx vertx) {
            this.vertx = vertx;
            return this;
        }

        @Generated
        public VertxMcpStreamableServerTransportProviderBuilder keepAliveInterval(Duration keepAliveInterval) {
            this.keepAliveInterval = keepAliveInterval;
            return this;
        }

        @Generated
        public VertxMcpStreamableServerTransportProvider build() {
            return new VertxMcpStreamableServerTransportProvider(this.objectMapper, this.mcpEndpoint, this.disallowDelete, this.vertx, this.keepAliveInterval);
        }

        @Generated
        public String toString() {
            return "VertxMcpStreamableServerTransportProvider.VertxMcpStreamableServerTransportProviderBuilder(objectMapper=" + String.valueOf(this.objectMapper) + ", mcpEndpoint=" + this.mcpEndpoint + ", disallowDelete=" + this.disallowDelete + ", vertx=" + String.valueOf(this.vertx) + ", keepAliveInterval=" + String.valueOf(this.keepAliveInterval) + ")";
        }
    }
}

