/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.server.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.server.DefaultMcpTransportContext;
import io.modelcontextprotocol.server.McpTransportContext;
import io.modelcontextprotocol.server.McpTransportContextExtractor;
import io.modelcontextprotocol.server.transport.IMcpHttpServerTransport;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpStreamableServerSession;
import io.modelcontextprotocol.spec.McpStreamableServerTransport;
import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.KeepAliveScheduler;
import io.modelcontextprotocol.util.Utils;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import org.noear.solon.SolonApp;
import org.noear.solon.core.handle.Context;
import org.noear.solon.core.handle.Entity;
import org.noear.solon.web.sse.SseEmitter;
import org.noear.solon.web.sse.SseEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class WebRxStreamableServerTransportProvider
implements McpStreamableServerTransportProvider,
IMcpHttpServerTransport {
    private static final Logger logger = LoggerFactory.getLogger(WebRxStreamableServerTransportProvider.class);
    public static final String MESSAGE_EVENT_TYPE = "message";
    public static final String ENDPOINT_EVENT_TYPE = "endpoint";
    public static final String DEFAULT_BASE_URL = "";
    private final String mcpEndpoint;
    private final boolean disallowDelete;
    private final ObjectMapper objectMapper;
    private McpStreamableServerSession.Factory sessionFactory;
    private final ConcurrentHashMap<String, McpStreamableServerSession> sessions = new ConcurrentHashMap();
    private McpTransportContextExtractor<Context> contextExtractor;
    private volatile boolean isClosing = false;
    private KeepAliveScheduler keepAliveScheduler;

    private WebRxStreamableServerTransportProvider(ObjectMapper objectMapper, String mcpEndpoint, boolean disallowDelete, McpTransportContextExtractor<Context> contextExtractor, Duration keepAliveInterval) {
        Assert.notNull(objectMapper, "ObjectMapper must not be null");
        Assert.notNull(mcpEndpoint, "MCP endpoint must not be null");
        Assert.notNull(contextExtractor, "McpTransportContextExtractor must not be null");
        this.objectMapper = objectMapper;
        this.mcpEndpoint = mcpEndpoint;
        this.disallowDelete = disallowDelete;
        this.contextExtractor = contextExtractor;
        if (keepAliveInterval != null) {
            this.keepAliveScheduler = KeepAliveScheduler.builder(() -> this.isClosing ? Flux.empty() : Flux.fromIterable(this.sessions.values())).initialDelay(keepAliveInterval).interval(keepAliveInterval).build();
            this.keepAliveScheduler.start();
        } else {
            logger.warn("Keep-alive interval is not set or invalid. No keep-alive will be scheduled.");
        }
    }

    @Override
    public void toHttpHandler(SolonApp app) {
        if (app != null) {
            app.get(this.mcpEndpoint, this::handleGet);
            app.post(this.mcpEndpoint, this::handlePost);
            app.delete(this.mcpEndpoint, this::handleDelete);
        }
    }

    @Override
    public String getMcpEndpoint() {
        return this.mcpEndpoint;
    }

    @Override
    public List<String> protocolVersions() {
        return Utils.asList("2024-11-05", "2025-03-26");
    }

    @Override
    public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }

    @Override
    public Mono<Void> notifyClients(String method, Object params) {
        if (this.sessions.isEmpty()) {
            logger.debug("No active sessions to broadcast message to");
            return Mono.empty();
        }
        logger.debug("Attempting to broadcast message to {} active sessions", (Object)this.sessions.size());
        return Mono.fromRunnable(() -> this.sessions.values().parallelStream().forEach(session -> {
            try {
                session.sendNotification(method, params).block();
            }
            catch (Exception e) {
                logger.error("Failed to send message to session {}: {}", (Object)session.getId(), (Object)e.getMessage());
            }
        }));
    }

    @Override
    public Mono<Void> closeGracefully() {
        return Mono.fromRunnable(() -> {
            this.isClosing = true;
            logger.debug("Initiating graceful shutdown with {} active sessions", (Object)this.sessions.size());
            this.sessions.values().parallelStream().forEach(session -> {
                try {
                    session.closeGracefully().block();
                }
                catch (Exception e) {
                    logger.error("Failed to close session {}: {}", (Object)session.getId(), (Object)e.getMessage());
                }
            });
            this.sessions.clear();
            logger.debug("Graceful shutdown completed");
        }).then().doOnSuccess(v -> {
            if (this.keepAliveScheduler != null) {
                this.keepAliveScheduler.shutdown();
            }
        });
    }

    private void handleGet(Context request) throws Throwable {
        Entity entity;
        request.contentType(DEFAULT_BASE_URL);
        Object returnValue = this.handleGetDo(request);
        if (returnValue instanceof Entity && (entity = (Entity)returnValue).body() != null) {
            if (entity.body() instanceof McpError) {
                McpError mcpError = (McpError)entity.body();
                entity.body((Object)mcpError.getMessage());
            } else if (entity.body() instanceof McpSchema.JSONRPCResponse) {
                entity.body((Object)this.objectMapper.writeValueAsString(entity.body()));
            }
        }
        request.returnValue(returnValue);
    }

    private Object handleGetDo(Context request) {
        if (this.isClosing) {
            return new Entity().status(503).body((Object)"Server is shutting down");
        }
        String acceptHeaders = request.acceptNew();
        if (!acceptHeaders.contains("text/event-stream")) {
            return new Entity().status(400).body((Object)"Invalid Accept header. Expected TEXT_EVENT_STREAM");
        }
        McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
        if (!request.headerMap().containsKey("mcp-session-id")) {
            return new Entity().status(400).body((Object)"Session ID required in mcp-session-id header");
        }
        String sessionId = request.header("mcp-session-id");
        McpStreamableServerSession session = this.sessions.get(sessionId);
        if (session == null) {
            return new Entity().status(404);
        }
        logger.debug("Handling GET request for session: {}", (Object)sessionId);
        try {
            SseEmitter sseEmitter = new SseEmitter(-1L);
            sseEmitter.onTimeout(() -> logger.debug("SSE connection timed out for session: {}", (Object)sessionId));
            sseEmitter.onInited(emitter -> {
                WebRxStreamableMcpSessionTransport sessionTransport = new WebRxStreamableMcpSessionTransport(sessionId, (SseEmitter)emitter);
                if (request.headerMap().containsKey("Last-Event-ID")) {
                    String lastId = request.header("Last-Event-ID");
                    try {
                        session.replay(lastId).contextWrite(ctx -> ctx.put((Object)"MCP_TRANSPORT_CONTEXT", (Object)transportContext)).toIterable().forEach(message -> {
                            try {
                                sessionTransport.sendMessage((McpSchema.JSONRPCMessage)message).contextWrite(ctx -> ctx.put((Object)"MCP_TRANSPORT_CONTEXT", (Object)transportContext)).block();
                            }
                            catch (Exception e) {
                                logger.error("Failed to replay message: {}", (Object)e.getMessage());
                                emitter.error((Throwable)e);
                            }
                        });
                    }
                    catch (Exception e) {
                        logger.error("Failed to replay messages: {}", (Object)e.getMessage());
                        emitter.error((Throwable)e);
                    }
                } else {
                    McpStreamableServerSession.McpStreamableServerSessionStream listeningStream = session.listeningStream(sessionTransport);
                    emitter.onCompletion(() -> {
                        logger.debug("SSE connection completed for session: {}", (Object)sessionId);
                        listeningStream.close();
                    });
                }
            });
            return sseEmitter;
        }
        catch (Exception e) {
            logger.error("Failed to handle GET request for session {}: {}", (Object)sessionId, (Object)e.getMessage());
            return new Entity().status(500);
        }
    }

    private void handlePost(Context request) throws Throwable {
        Entity entity;
        request.contentType(DEFAULT_BASE_URL);
        Object returnValue = this.handlePostDo(request);
        if (returnValue instanceof Entity && (entity = (Entity)returnValue).body() != null) {
            if (entity.body() instanceof McpError) {
                McpError mcpError = (McpError)entity.body();
                entity.body((Object)mcpError.getMessage());
            } else if (entity.body() instanceof McpSchema.JSONRPCResponse) {
                entity.body((Object)this.objectMapper.writeValueAsString(entity.body()));
            }
        }
        request.returnValue(returnValue);
    }

    private Object handlePostDo(Context request) {
        if (this.isClosing) {
            return new Entity().status(503).body((Object)"Server is shutting down");
        }
        String acceptHeaders = request.acceptNew();
        if (!acceptHeaders.contains("text/event-stream") || !acceptHeaders.contains("application/json")) {
            return new Entity().status(400).body((Object)new McpError((Object)"Invalid Accept headers. Expected TEXT_EVENT_STREAM and APPLICATION_JSON"));
        }
        McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
        try {
            McpSchema.JSONRPCRequest jsonrpcRequest;
            String body = request.body();
            McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, body);
            if (message instanceof McpSchema.JSONRPCRequest && (jsonrpcRequest = (McpSchema.JSONRPCRequest)message).getMethod().equals("initialize")) {
                McpSchema.InitializeRequest initializeRequest = (McpSchema.InitializeRequest)this.objectMapper.convertValue(jsonrpcRequest.getParams(), (TypeReference)new TypeReference<McpSchema.InitializeRequest>(){});
                McpStreamableServerSession.McpStreamableServerSessionInit init = this.sessionFactory.startSession(initializeRequest);
                this.sessions.put(init.getSession().getId(), init.getSession());
                try {
                    McpSchema.InitializeResult initResult = (McpSchema.InitializeResult)init.getInitResult().block();
                    return new Entity().contentType("application/json").headerAdd("mcp-session-id", init.getSession().getId()).body((Object)new McpSchema.JSONRPCResponse("2.0", jsonrpcRequest.getId(), initResult, null));
                }
                catch (Exception e) {
                    logger.error("Failed to initialize session: {}", (Object)e.getMessage());
                    return new Entity().status(500).body((Object)new McpError((Object)e.getMessage()));
                }
            }
            if (!request.headerMap().containsKey("mcp-session-id")) {
                return new Entity().status(400).body((Object)new McpError((Object)"Session ID missing"));
            }
            String sessionId = request.header("mcp-session-id");
            McpStreamableServerSession session = this.sessions.get(sessionId);
            if (session == null) {
                return new Entity().status(404).body((Object)new McpError((Object)("Session not found: " + sessionId)));
            }
            if (message instanceof McpSchema.JSONRPCResponse) {
                McpSchema.JSONRPCResponse jsonrpcResponse = (McpSchema.JSONRPCResponse)message;
                session.accept(jsonrpcResponse).contextWrite(ctx -> ctx.put((Object)"MCP_TRANSPORT_CONTEXT", (Object)transportContext)).block();
                return new Entity().status(202);
            }
            if (message instanceof McpSchema.JSONRPCNotification) {
                McpSchema.JSONRPCNotification jsonrpcNotification = (McpSchema.JSONRPCNotification)message;
                session.accept(jsonrpcNotification).contextWrite(ctx -> ctx.put((Object)"MCP_TRANSPORT_CONTEXT", (Object)transportContext)).block();
                return new Entity().status(202);
            }
            if (message instanceof McpSchema.JSONRPCRequest) {
                McpSchema.JSONRPCRequest jsonrpcRequest2 = (McpSchema.JSONRPCRequest)message;
                SseEmitter sseEmitter = new SseEmitter(-1L);
                sseEmitter.onCompletion(() -> logger.debug("Request response stream completed for session: {}", (Object)sessionId));
                sseEmitter.onTimeout(() -> logger.debug("Request response stream timed out for session: {}", (Object)sessionId));
                sseEmitter.onInited(emitter -> {
                    WebRxStreamableMcpSessionTransport sessionTransport = new WebRxStreamableMcpSessionTransport(sessionId, (SseEmitter)emitter);
                    try {
                        session.responseStream(jsonrpcRequest2, sessionTransport).contextWrite(ctx -> ctx.put((Object)"MCP_TRANSPORT_CONTEXT", (Object)transportContext)).block();
                    }
                    catch (Exception e) {
                        logger.error("Failed to handle request stream: {}", (Object)e.getMessage());
                        emitter.error((Throwable)e);
                    }
                });
                return sseEmitter;
            }
            return new Entity().status(500).body((Object)new McpError((Object)"Unknown message type"));
        }
        catch (IOException | IllegalArgumentException e) {
            logger.error("Failed to deserialize message: {}", (Object)e.getMessage());
            return new Entity().status(400).body((Object)new McpError((Object)"Invalid message format"));
        }
        catch (Exception e) {
            logger.error("Error handling message: {}", (Object)e.getMessage());
            return new Entity().status(500).body((Object)new McpError((Object)e.getMessage()));
        }
    }

    private void handleDelete(Context request) throws Throwable {
        request.contentType(DEFAULT_BASE_URL);
        Entity entity = this.handleDeleteDo(request);
        if (entity.body() != null) {
            if (entity.body() instanceof McpError) {
                McpError mcpError = (McpError)entity.body();
                entity.body((Object)mcpError.getMessage());
            } else if (entity.body() instanceof McpSchema.JSONRPCResponse) {
                entity.body((Object)this.objectMapper.writeValueAsString(entity.body()));
            }
        }
        request.returnValue((Object)entity);
    }

    private Entity handleDeleteDo(Context request) {
        if (this.isClosing) {
            return new Entity().status(503).body((Object)"Server is shutting down");
        }
        if (this.disallowDelete) {
            return new Entity().status(405);
        }
        McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
        if (!request.headerMap().containsKey("mcp-session-id")) {
            return new Entity().status(400).body((Object)"Session ID required in mcp-session-id header");
        }
        String sessionId = request.header("mcp-session-id");
        McpStreamableServerSession session = this.sessions.get(sessionId);
        if (session == null) {
            return new Entity().status(404);
        }
        try {
            session.delete().contextWrite(ctx -> ctx.put((Object)"MCP_TRANSPORT_CONTEXT", (Object)transportContext)).block();
            this.sessions.remove(sessionId);
            return new Entity();
        }
        catch (Exception e) {
            logger.error("Failed to delete session {}: {}", (Object)sessionId, (Object)e.getMessage());
            return new Entity().status(500).body((Object)new McpError((Object)e.getMessage()));
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private ObjectMapper objectMapper;
        private String mcpEndpoint = "/mcp";
        private boolean disallowDelete = false;
        private McpTransportContextExtractor<Context> contextExtractor = (serverRequest, context) -> {
            context.put(Context.class.getName(), serverRequest);
            return context;
        };
        private Duration keepAliveInterval;

        public Builder objectMapper(ObjectMapper objectMapper) {
            Assert.notNull(objectMapper, "ObjectMapper must not be null");
            this.objectMapper = objectMapper;
            return this;
        }

        public Builder mcpEndpoint(String mcpEndpoint) {
            Assert.notNull(mcpEndpoint, "MCP endpoint must not be null");
            this.mcpEndpoint = mcpEndpoint;
            return this;
        }

        public Builder disallowDelete(boolean disallowDelete) {
            this.disallowDelete = disallowDelete;
            return this;
        }

        public Builder contextExtractor(McpTransportContextExtractor<Context> contextExtractor) {
            Assert.notNull(contextExtractor, "contextExtractor must not be null");
            this.contextExtractor = contextExtractor;
            return this;
        }

        public Builder keepAliveInterval(Duration keepAliveInterval) {
            this.keepAliveInterval = keepAliveInterval;
            return this;
        }

        public WebRxStreamableServerTransportProvider build() {
            Assert.notNull(this.objectMapper, "ObjectMapper must be set");
            Assert.notNull(this.mcpEndpoint, "MCP endpoint must be set");
            return new WebRxStreamableServerTransportProvider(this.objectMapper, this.mcpEndpoint, this.disallowDelete, this.contextExtractor, this.keepAliveInterval);
        }
    }

    private class WebRxStreamableMcpSessionTransport
    implements McpStreamableServerTransport {
        private final String sessionId;
        private final SseEmitter sseEmitter;
        private final ReentrantLock lock = new ReentrantLock();
        private volatile boolean closed = false;

        WebRxStreamableMcpSessionTransport(String sessionId, SseEmitter sseEmitter) {
            this.sessionId = sessionId;
            this.sseEmitter = sseEmitter;
            logger.debug("Streamable session transport {} initialized with SSE builder", (Object)sessionId);
        }

        @Override
        public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
            return this.sendMessage(message, null);
        }

        @Override
        public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message, String messageId) {
            return Mono.fromRunnable(() -> {
                if (this.closed) {
                    logger.debug("Attempted to send message to closed session: {}", (Object)this.sessionId);
                    return;
                }
                this.lock.lock();
                try {
                    if (this.closed) {
                        logger.debug("Session {} was closed during message send attempt", (Object)this.sessionId);
                        return;
                    }
                    String jsonText = WebRxStreamableServerTransportProvider.this.objectMapper.writeValueAsString((Object)message);
                    this.sseEmitter.send(new SseEvent().id(messageId != null ? messageId : this.sessionId).name(WebRxStreamableServerTransportProvider.MESSAGE_EVENT_TYPE).data((Object)jsonText));
                    logger.debug("Message sent to session {} with ID {}: {}", new Object[]{this.sessionId, messageId, jsonText});
                }
                catch (Exception e) {
                    logger.error("Failed to send message to session {}: {}", (Object)this.sessionId, (Object)e.getMessage());
                    try {
                        this.sseEmitter.error((Throwable)e);
                    }
                    catch (Exception errorException) {
                        logger.error("Failed to send error to SSE builder for session {}: {}", (Object)this.sessionId, (Object)errorException.getMessage());
                    }
                }
                finally {
                    this.lock.unlock();
                }
            });
        }

        @Override
        public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
            return (T)WebRxStreamableServerTransportProvider.this.objectMapper.convertValue(data, typeRef);
        }

        @Override
        public Mono<Void> closeGracefully() {
            return Mono.fromRunnable(() -> this.close());
        }

        @Override
        public void close() {
            this.lock.lock();
            try {
                if (this.closed) {
                    logger.debug("Session transport {} already closed", (Object)this.sessionId);
                    return;
                }
                this.closed = true;
                this.sseEmitter.complete();
                logger.debug("Successfully completed SSE builder for session {}", (Object)this.sessionId);
            }
            catch (Exception e) {
                logger.warn("Failed to complete SSE builder for session {}: {}", (Object)this.sessionId, (Object)e.getMessage());
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

