/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.ext.mcp.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpServerSession;
import io.modelcontextprotocol.spec.McpServerTransport;
import io.modelcontextprotocol.spec.McpServerTransportProvider;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.mcp.transport.KeepAliveService;
import io.vertx.ext.mcp.transport.VertxMcpTransport;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Generated;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class VertxMcpSseServerTransportProvider
implements McpServerTransportProvider,
VertxMcpTransport {
    private static final Logger logger = LoggerFactory.getLogger(VertxMcpSseServerTransportProvider.class);
    public static final String MESSAGE_EVENT_TYPE = "message";
    public static final String ENDPOINT_EVENT_TYPE = "endpoint";
    public static final String DEFAULT_SSE_ENDPOINT = "/sse";
    public static final String DEFAULT_MESSAGE_ENDPOINT = "/message";
    public static final Duration DEFAULT_KEEP_ALIVE_INTERVAL = Duration.ofSeconds(30L);
    private final String baseUrl;
    private final String messageEndpoint;
    private final String sseEndpoint;
    private final Vertx vertx;
    private final ObjectMapper objectMapper;
    private McpServerSession.Factory sessionFactory;
    private final ConcurrentHashMap<String, McpServerSession> sessions = new ConcurrentHashMap();
    private volatile boolean isClosing = false;
    private Router router;
    private final KeepAliveService keepAliveService;

    public VertxMcpSseServerTransportProvider(String baseUrl, String messageEndpoint, String sseEndpoint, Duration keepAliveInterval, ObjectMapper objectMapper, Vertx vertx) {
        if (baseUrl == null) {
            throw new IllegalArgumentException("baseUrl cannot be null");
        }
        if (objectMapper == null) {
            throw new IllegalArgumentException("objectMapper cannot be null");
        }
        if (vertx == null) {
            throw new IllegalArgumentException("vertx cannot be null");
        }
        this.baseUrl = baseUrl;
        this.messageEndpoint = messageEndpoint != null ? messageEndpoint : DEFAULT_MESSAGE_ENDPOINT;
        this.sseEndpoint = sseEndpoint != null ? sseEndpoint : DEFAULT_SSE_ENDPOINT;
        this.objectMapper = objectMapper;
        this.vertx = vertx;
        this.keepAliveService = new KeepAliveService(vertx, keepAliveInterval != null ? keepAliveInterval : DEFAULT_KEEP_ALIVE_INTERVAL);
    }

    private void initializeRouter() {
        this.router = Router.router((Vertx)this.vertx);
        this.router.route().handler((Handler)BodyHandler.create((boolean)false));
        this.router.get(this.sseEndpoint).handler(this::handleSseConnection);
        this.router.post(this.messageEndpoint).handler(this::handleMessage);
        this.startKeepAlive();
    }

    private void startKeepAlive() {
        this.keepAliveService.start(v -> this.sendKeepAlivePing());
    }

    private void sendKeepAlivePing() {
        if (!this.isClosing && !this.sessions.isEmpty()) {
            logger.trace("Sending keep-alive ping to {} active sessions", (Object)this.sessions.size());
            TypeReference<Object> typeRef = new TypeReference<Object>(this){};
            Flux.fromIterable(this.sessions.values()).flatMap(arg_0 -> VertxMcpSseServerTransportProvider.lambda$sendKeepAlivePing$2(typeRef, arg_0)).subscribe(result -> {}, error -> logger.error("Error during keep-alive ping", error));
        }
    }

    private void stopKeepAlive() {
        this.keepAliveService.stop();
    }

    @Override
    public Router getRouter() {
        if (this.router == null) {
            this.initializeRouter();
        }
        return this.router;
    }

    public List<String> protocolVersions() {
        return List.of("2024-11-05");
    }

    public void setSessionFactory(McpServerSession.Factory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }

    public Mono<Void> notifyClients(String method, Object params) {
        if (this.sessions.isEmpty()) {
            logger.debug("No active sessions to broadcast message to");
            return Mono.empty();
        }
        logger.debug("Attempting to broadcast message to {} active sessions", (Object)this.sessions.size());
        return Flux.fromIterable(this.sessions.values()).flatMap(session -> session.sendNotification(method, params).doOnError(e -> logger.error("Failed to send message to session {}: {}", (Object)session.getId(), (Object)e.getMessage())).onErrorComplete()).then();
    }

    @Override
    public Mono<Void> closeGracefully() {
        this.isClosing = true;
        this.stopKeepAlive();
        return Flux.fromIterable(this.sessions.values()).doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", (Object)this.sessions.size())).flatMap(McpServerSession::closeGracefully).then().doOnSuccess(v -> {
            logger.debug("Graceful shutdown completed");
            this.sessions.clear();
        });
    }

    private void handleSseConnection(RoutingContext context) {
        if (this.isClosing) {
            context.response().setStatusCode(503).putHeader(HttpHeaders.CONTENT_TYPE, (CharSequence)"text/plain").end("Server is shutting down");
            return;
        }
        HttpServerResponse response = context.response();
        response.putHeader(HttpHeaders.CONTENT_TYPE, (CharSequence)"text/event-stream").putHeader(HttpHeaders.CACHE_CONTROL, (CharSequence)"no-cache").putHeader(HttpHeaders.CONNECTION, (CharSequence)"keep-alive").setChunked(true);
        VertxMcpSessionTransport sessionTransport = new VertxMcpSessionTransport(response);
        McpServerSession session = this.sessionFactory.create((McpServerTransport)sessionTransport);
        String sessionId = session.getId();
        logger.debug("Created new SSE connection for session: {}", (Object)sessionId);
        this.sessions.put(sessionId, session);
        logger.debug("Sending initial endpoint event to session: {}", (Object)sessionId);
        this.sendSseEvent(response, ENDPOINT_EVENT_TYPE, this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId);
        response.closeHandler(v -> {
            logger.debug("Session {} closed", (Object)sessionId);
            this.sessions.remove(sessionId);
        });
    }

    private void handleMessage(RoutingContext context) {
        if (this.isClosing) {
            context.response().setStatusCode(503).putHeader(HttpHeaders.CONTENT_TYPE, (CharSequence)"text/plain").end("Server is shutting down");
            return;
        }
        String sessionId = context.queryParams().get("sessionId");
        if (sessionId == null) {
            context.response().setStatusCode(400).putHeader(HttpHeaders.CONTENT_TYPE, (CharSequence)"application/json").end(new JsonObject().put("error", (Object)"Session ID missing in message endpoint").encode());
            return;
        }
        McpServerSession session = this.sessions.get(sessionId);
        if (session == null) {
            context.response().setStatusCode(404).putHeader(HttpHeaders.CONTENT_TYPE, (CharSequence)"application/json").end(new JsonObject().put("error", (Object)("Session not found: " + sessionId)).encode());
            return;
        }
        String body = context.body().asString();
        try {
            McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage((ObjectMapper)this.objectMapper, (String)body);
            session.handle(message).doOnSuccess(response -> context.response().putHeader(HttpHeaders.CONTENT_TYPE, (CharSequence)"application/json").end()).doOnError(error -> {
                logger.error("Error processing message: {}", (Object)error.getMessage());
                context.response().setStatusCode(500).putHeader(HttpHeaders.CONTENT_TYPE, (CharSequence)"application/json").end(new JsonObject().put("error", (Object)error.getMessage()).encode());
            }).subscribe();
        }
        catch (IOException | IllegalArgumentException e) {
            logger.error("Failed to deserialize message: {}", (Object)e.getMessage());
            context.response().setStatusCode(400).putHeader(HttpHeaders.CONTENT_TYPE, (CharSequence)"application/json").end(new JsonObject().put("error", (Object)"Invalid message format").encode());
        }
    }

    private Future<Void> sendSseEvent(HttpServerResponse response, String eventType, String data) {
        return response.write("event: " + eventType + "\ndata: " + data + "\n\n");
    }

    @Generated
    public static VertxMcpSseServerTransportProviderBuilder builder() {
        return new VertxMcpSseServerTransportProviderBuilder();
    }

    private static /* synthetic */ Publisher lambda$sendKeepAlivePing$2(1 typeRef, McpServerSession session) {
        return session.sendRequest("ping", null, (TypeReference)typeRef).doOnError(e -> logger.warn("Failed to send keep-alive ping to session {}: {}", (Object)session.getId(), (Object)e.getMessage())).onErrorComplete();
    }

    private class VertxMcpSessionTransport
    implements McpServerTransport {
        private final HttpServerResponse response;

        public VertxMcpSessionTransport(HttpServerResponse response) {
            this.response = response;
        }

        public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
            return Mono.fromSupplier(() -> {
                try {
                    return VertxMcpSseServerTransportProvider.this.objectMapper.writeValueAsString((Object)message);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).flatMap(jsonText -> Mono.fromCompletionStage((CompletionStage)VertxMcpSseServerTransportProvider.this.sendSseEvent(this.response, VertxMcpSseServerTransportProvider.MESSAGE_EVENT_TYPE, (String)jsonText).toCompletionStage())).doOnError(e -> {
                logger.error("Failed to send message to session", e);
                this.response.end();
            });
        }

        public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
            return (T)VertxMcpSseServerTransportProvider.this.objectMapper.convertValue(data, typeRef);
        }

        public Mono<Void> closeGracefully() {
            return Mono.fromRunnable(() -> {
                if (!this.response.closed()) {
                    this.response.end();
                }
            });
        }

        public void close() {
            if (!this.response.closed()) {
                this.response.end();
            }
        }
    }

    @Generated
    public static class VertxMcpSseServerTransportProviderBuilder {
        @Generated
        private String baseUrl;
        @Generated
        private String messageEndpoint;
        @Generated
        private String sseEndpoint;
        @Generated
        private Duration keepAliveInterval;
        @Generated
        private ObjectMapper objectMapper;
        @Generated
        private Vertx vertx;

        @Generated
        VertxMcpSseServerTransportProviderBuilder() {
        }

        @Generated
        public VertxMcpSseServerTransportProviderBuilder baseUrl(String baseUrl) {
            this.baseUrl = baseUrl;
            return this;
        }

        @Generated
        public VertxMcpSseServerTransportProviderBuilder messageEndpoint(String messageEndpoint) {
            this.messageEndpoint = messageEndpoint;
            return this;
        }

        @Generated
        public VertxMcpSseServerTransportProviderBuilder sseEndpoint(String sseEndpoint) {
            this.sseEndpoint = sseEndpoint;
            return this;
        }

        @Generated
        public VertxMcpSseServerTransportProviderBuilder keepAliveInterval(Duration keepAliveInterval) {
            this.keepAliveInterval = keepAliveInterval;
            return this;
        }

        @Generated
        public VertxMcpSseServerTransportProviderBuilder objectMapper(ObjectMapper objectMapper) {
            this.objectMapper = objectMapper;
            return this;
        }

        @Generated
        public VertxMcpSseServerTransportProviderBuilder vertx(Vertx vertx) {
            this.vertx = vertx;
            return this;
        }

        @Generated
        public VertxMcpSseServerTransportProvider build() {
            return new VertxMcpSseServerTransportProvider(this.baseUrl, this.messageEndpoint, this.sseEndpoint, this.keepAliveInterval, this.objectMapper, this.vertx);
        }

        @Generated
        public String toString() {
            return "VertxMcpSseServerTransportProvider.VertxMcpSseServerTransportProviderBuilder(baseUrl=" + this.baseUrl + ", messageEndpoint=" + this.messageEndpoint + ", sseEndpoint=" + this.sseEndpoint + ", keepAliveInterval=" + String.valueOf(this.keepAliveInterval) + ", objectMapper=" + String.valueOf(this.objectMapper) + ", vertx=" + String.valueOf(this.vertx) + ")";
        }
    }
}

