/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.server.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpServerSession;
import io.modelcontextprotocol.spec.McpServerTransport;
import io.modelcontextprotocol.spec.McpServerTransportProvider;
import io.modelcontextprotocol.util.Assert;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.noear.solon.Solon;
import org.noear.solon.SolonApp;
import org.noear.solon.Utils;
import org.noear.solon.core.handle.Context;
import org.noear.solon.core.handle.Entity;
import org.noear.solon.core.util.PathUtil;
import org.noear.solon.web.sse.SseEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class WebRxSseServerTransportProvider
implements McpServerTransportProvider {
    private static final Logger logger = LoggerFactory.getLogger(WebRxSseServerTransportProvider.class);
    public static final String MESSAGE_EVENT_TYPE = "message";
    public static final String ENDPOINT_EVENT_TYPE = "endpoint";
    public static final String DEFAULT_SSE_ENDPOINT = "/sse";
    private final ObjectMapper objectMapper;
    private final String messageEndpoint;
    private final String sseEndpoint;
    private McpServerSession.Factory sessionFactory;
    private final ConcurrentHashMap<String, McpServerSession> sessions = new ConcurrentHashMap();
    private volatile boolean isClosing = false;

    public WebRxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) {
        Assert.notNull(objectMapper, "ObjectMapper must not be null");
        Assert.notNull(messageEndpoint, "Message endpoint must not be null");
        Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
        this.objectMapper = objectMapper;
        this.messageEndpoint = messageEndpoint;
        this.sseEndpoint = sseEndpoint;
    }

    public void sendHeartbeat() {
        for (McpServerSession session : this.sessions.values()) {
            ((WebRxMcpSessionTransport)session.getTransport()).sendHeartbeat();
        }
    }

    public void toHttpHandler(SolonApp app) {
        if (app != null) {
            app.get(this.sseEndpoint, this::handleSseConnection);
            app.post(this.messageEndpoint, this::handleMessage);
        }
    }

    public String getSseEndpoint() {
        return this.sseEndpoint;
    }

    public String getMessageEndpoint() {
        return this.messageEndpoint;
    }

    public WebRxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
        this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
    }

    @Override
    public void setSessionFactory(McpServerSession.Factory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }

    @Override
    public Mono<Void> notifyClients(String method, Map<String, Object> params) {
        if (this.sessions.isEmpty()) {
            logger.debug("No active sessions to broadcast message to");
            return Mono.empty();
        }
        logger.debug("Attempting to broadcast message to {} active sessions", (Object)this.sessions.size());
        return Flux.fromStream(this.sessions.values().stream()).flatMap(session -> session.sendNotification(method, params).doOnError(e -> logger.error("Failed to send message to session {}: {}", (Object)session.getId(), (Object)e.getMessage())).onErrorComplete()).then();
    }

    @Override
    public Mono<Void> closeGracefully() {
        return Flux.fromIterable(this.sessions.values()).doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", (Object)this.sessions.size())).flatMap(McpServerSession::closeGracefully).then();
    }

    public void handleSseConnection(Context ctx) throws Throwable {
        if (this.isClosing) {
            ctx.status(503);
            ctx.output("Server is shutting down");
            return;
        }
        String sessionId = Utils.uuid();
        Flux publisher = Flux.create(sink -> {
            WebRxMcpSessionTransport sessionTransport = new WebRxMcpSessionTransport(ctx, (FluxSink<SseEvent>)sink);
            McpServerSession session = this.sessionFactory.create(sessionTransport);
            logger.debug("Created new SSE connection for session: {}", (Object)sessionId);
            this.sessions.put(sessionId, session);
            String messageEndpointNew = this.messageEndpoint;
            if (Utils.isNotEmpty((String)Solon.cfg().serverContextPath())) {
                messageEndpointNew = PathUtil.mergePath((String)Solon.cfg().serverContextPath(), (String)this.messageEndpoint);
            }
            logger.debug("Sending initial endpoint event to session: {}", (Object)sessionId);
            sink.next((Object)new SseEvent().name(ENDPOINT_EVENT_TYPE).data((Object)(messageEndpointNew + "?sessionId=" + sessionId)));
            sink.onCancel(() -> {
                logger.debug("Session {} cancelled", (Object)sessionId);
                this.sessions.remove(sessionId);
            });
        });
        ctx.contentType("text/event-stream");
        ctx.returnValue((Object)publisher);
    }

    public void handleMessage(Context ctx) throws Throwable {
        if (this.isClosing) {
            ctx.status(503);
            ctx.output("Server is shutting down");
            return;
        }
        String sessionId = ctx.param("sessionId");
        if (Utils.isEmpty((String)sessionId)) {
            ctx.status(400);
            ctx.render((Object)new McpError((Object)"Session ID missing in message endpoint"));
            return;
        }
        McpServerSession session = this.sessions.get(sessionId);
        if (session == null) {
            ctx.status(404);
            ctx.render((Object)new McpError((Object)("Session not found: " + sessionId)));
            return;
        }
        String body = ctx.body();
        try {
            McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, body);
            Mono mono = session.handle(message).flatMap(response -> Mono.just((Object)new Entity())).onErrorResume(error -> {
                logger.error("Error processing  message: {}", (Object)error.getMessage());
                return Mono.just((Object)new Entity().status(500).body((Object)new McpError((Object)error.getMessage())));
            });
            ctx.returnValue((Object)mono);
        }
        catch (IOException | IllegalArgumentException e) {
            logger.error("Failed to deserialize message: {}", (Object)e.getMessage());
            ctx.status(400);
            ctx.render((Object)new McpError((Object)"Invalid message format"));
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private ObjectMapper objectMapper;
        private String messageEndpoint;
        private String sseEndpoint = "/sse";

        public Builder objectMapper(ObjectMapper objectMapper) {
            Assert.notNull(objectMapper, "ObjectMapper must not be null");
            this.objectMapper = objectMapper;
            return this;
        }

        public Builder messageEndpoint(String messageEndpoint) {
            Assert.notNull(messageEndpoint, "Message endpoint must not be null");
            this.messageEndpoint = messageEndpoint;
            return this;
        }

        public Builder sseEndpoint(String sseEndpoint) {
            Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
            this.sseEndpoint = sseEndpoint;
            return this;
        }

        public WebRxSseServerTransportProvider build() {
            Assert.notNull(this.objectMapper, "ObjectMapper must be set");
            Assert.notNull(this.messageEndpoint, "Message endpoint must be set");
            return new WebRxSseServerTransportProvider(this.objectMapper, this.messageEndpoint, this.sseEndpoint);
        }
    }

    public class WebRxMcpSessionTransport
    implements McpServerTransport {
        private final Context context;
        private final FluxSink<SseEvent> sink;

        public WebRxMcpSessionTransport(Context context, FluxSink<SseEvent> sink) {
            this.context = context;
            this.sink = sink;
        }

        public Context getContext() {
            return this.context;
        }

        public void sendHeartbeat() {
            this.sink.next((Object)new SseEvent().comment("heartbeat"));
        }

        @Override
        public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
            return Mono.fromSupplier(() -> {
                try {
                    return WebRxSseServerTransportProvider.this.objectMapper.writeValueAsString((Object)message);
                }
                catch (IOException e) {
                    throw Exceptions.propagate((Throwable)e);
                }
            }).doOnNext(jsonText -> {
                SseEvent event = new SseEvent().name(WebRxSseServerTransportProvider.MESSAGE_EVENT_TYPE).data(jsonText);
                this.sink.next((Object)event);
            }).doOnError(e -> {
                Throwable exception = Exceptions.unwrap((Throwable)e);
                this.sink.error(exception);
            }).then();
        }

        @Override
        public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
            return (T)WebRxSseServerTransportProvider.this.objectMapper.convertValue(data, typeRef);
        }

        @Override
        public Mono<Void> closeGracefully() {
            return Mono.fromRunnable(() -> this.sink.complete());
        }

        @Override
        public void close() {
            this.sink.complete();
        }
    }
}

