/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.client.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.util.Assert;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.noear.solon.net.http.HttpUtilsBuilder;
import org.noear.solon.rx.SimpleSubscriber;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class WebRxSseClientTransport
implements McpClientTransport {
    private static final Logger logger = LoggerFactory.getLogger(WebRxSseClientTransport.class);
    private static final String MESSAGE_EVENT_TYPE = "message";
    private static final String ENDPOINT_EVENT_TYPE = "endpoint";
    private static final String DEFAULT_SSE_ENDPOINT = "/sse";
    private final HttpUtilsBuilder webBuilder;
    private final String sseEndpoint;
    protected ObjectMapper objectMapper;
    private volatile boolean isClosing = false;
    private final CountDownLatch closeLatch = new CountDownLatch(1);
    private final AtomicReference<String> messageEndpoint = new AtomicReference();
    private final AtomicReference<CompletableFuture<Void>> connectionFuture = new AtomicReference();

    public WebRxSseClientTransport(HttpUtilsBuilder webBuilder) {
        this(webBuilder, new ObjectMapper());
    }

    public WebRxSseClientTransport(HttpUtilsBuilder webBuilder, ObjectMapper objectMapper) {
        this(webBuilder, DEFAULT_SSE_ENDPOINT, objectMapper);
    }

    public WebRxSseClientTransport(HttpUtilsBuilder webBuilder, String sseEndpoint, ObjectMapper objectMapper) {
        Assert.notNull(objectMapper, "ObjectMapper must not be null");
        Assert.notNull(webBuilder, "baseUri must not be empty");
        Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
        this.webBuilder = webBuilder;
        this.sseEndpoint = sseEndpoint;
        this.objectMapper = objectMapper;
    }

    public static Builder builder(HttpUtilsBuilder webBuilder) {
        return new Builder(webBuilder);
    }

    @Override
    public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
        CompletableFuture future = new CompletableFuture();
        this.connectionFuture.set(future);
        this.webBuilder.build(this.sseEndpoint).execAsSseStream("GET").subscribe((Subscriber)new SimpleSubscriber().doOnNext(event -> {
            if (this.isClosing) {
                return;
            }
            try {
                if (ENDPOINT_EVENT_TYPE.equals(event.getEvent())) {
                    String endpoint = event.data();
                    this.messageEndpoint.set(endpoint);
                    this.closeLatch.countDown();
                    future.complete(null);
                } else if (MESSAGE_EVENT_TYPE.equals(event.getEvent())) {
                    McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data());
                    ((Mono)handler.apply(Mono.just((Object)message))).subscribe();
                } else {
                    logger.error("Received unrecognized SSE event type: {}", (Object)event.getEvent());
                }
            }
            catch (IOException e) {
                logger.error("Error processing SSE event", (Throwable)e);
                future.completeExceptionally(e);
            }
        }).doOnError(error -> {
            if (!this.isClosing) {
                logger.warn("SSE connection error", error);
                future.completeExceptionally((Throwable)error);
            }
        }));
        return Mono.fromFuture(future);
    }

    @Override
    public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
        if (this.isClosing) {
            return Mono.empty();
        }
        try {
            if (!this.closeLatch.await(10L, TimeUnit.SECONDS)) {
                return Mono.error((Throwable)new McpError((Object)"Failed to wait for the message endpoint"));
            }
        }
        catch (InterruptedException e) {
            return Mono.error((Throwable)new McpError((Object)"Failed to wait for the message endpoint"));
        }
        String endpoint = this.messageEndpoint.get();
        if (endpoint == null) {
            return Mono.error((Throwable)new McpError((Object)"No message endpoint available"));
        }
        try {
            String jsonText = this.objectMapper.writeValueAsString((Object)message);
            CompletableFuture future = this.webBuilder.build(endpoint).header("Content-Type", "application/json").bodyOfJson(jsonText).execAsync("POST");
            return Mono.fromFuture((CompletableFuture)future.thenAccept(response -> {
                if (response.code() != 200 && response.code() != 201 && response.code() != 202 && response.code() != 206) {
                    logger.error("Error sending message: {}", (Object)response.code());
                }
            }));
        }
        catch (IOException e) {
            if (!this.isClosing) {
                return Mono.error((Throwable)new RuntimeException("Failed to serialize message", e));
            }
            return Mono.empty();
        }
    }

    @Override
    public Mono<Void> closeGracefully() {
        return Mono.fromRunnable(() -> {
            this.isClosing = true;
            CompletableFuture<Void> future = this.connectionFuture.get();
            if (future != null && !future.isDone()) {
                future.cancel(true);
            }
        });
    }

    @Override
    public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
        return (T)this.objectMapper.convertValue(data, typeRef);
    }

    public static class Builder {
        private final HttpUtilsBuilder webBuilder;
        private String sseEndpoint = "/sse";
        private ObjectMapper objectMapper = new ObjectMapper();

        public Builder(HttpUtilsBuilder webBuilder) {
            Assert.notNull(webBuilder, "webBuilder must not be empty");
            this.webBuilder = webBuilder;
        }

        public Builder sseEndpoint(String sseEndpoint) {
            Assert.hasText(sseEndpoint, "sseEndpoint must not be null");
            this.sseEndpoint = sseEndpoint;
            return this;
        }

        public Builder objectMapper(ObjectMapper objectMapper) {
            Assert.notNull(objectMapper, "objectMapper must not be null");
            this.objectMapper = objectMapper;
            return this;
        }

        public WebRxSseClientTransport build() {
            return new WebRxSseClientTransport(this.webBuilder, this.sseEndpoint, this.objectMapper);
        }
    }
}

