/*
 * Decompiled with CFR 0.152.
 */
package org.miaixz.bus.vortex.support;

import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import org.miaixz.bus.core.lang.Charset;
import org.miaixz.bus.extra.mq.MQConfig;
import org.miaixz.bus.extra.mq.MQFactory;
import org.miaixz.bus.extra.mq.Message;
import org.miaixz.bus.extra.mq.Producer;
import org.miaixz.bus.vortex.Assets;
import org.miaixz.bus.vortex.Context;
import org.miaixz.bus.vortex.Format;
import org.miaixz.bus.vortex.Router;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class MqRequestRouter
implements Router {
    @Resource
    private Properties mqProperties;
    private Producer producer;
    private final ExecutorService mqExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2, r -> {
        Thread t = new Thread(r, "mq-producer-pool");
        t.setDaemon(true);
        return t;
    });

    public void init() {
        String brokerUrl = this.mqProperties.getProperty("mq.broker.url");
        MQConfig config = MQConfig.of((String)brokerUrl);
        this.mqProperties.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> {
            String k;
            if (key instanceof String && value instanceof String && !(k = (String)key).equals("mq.broker.url")) {
                config.addProperty(k, (String)value);
            }
        }));
        this.producer = MQFactory.createEngine((MQConfig)config).getProducer();
    }

    @PreDestroy
    public void destroy() {
        if (this.producer != null) {
            try {
                this.producer.close();
            }
            catch (Exception e) {
                Format.error(null, "MQ_PRODUCER_CLOSE_ERROR", "Failed to close MQ producer");
            }
        }
        this.mqExecutor.shutdown();
    }

    @Override
    public Mono<ServerResponse> route(ServerRequest request, Context context, final Assets assets) {
        Format.info(request.exchange(), "MQ_ROUTE_START", "Method: " + assets.getMethod() + ", Topic: " + assets.getMethod());
        long startTime = System.currentTimeMillis();
        return request.bodyToMono(String.class).flatMap(payload -> {
            Format.debug(request.exchange(), "MQ_MESSAGE_SEND", "Method: " + assets.getMethod() + ", Payload size: " + payload.length());
            Message message = new Message(){
                private final String topic;
                private final byte[] content;
                final /* synthetic */ String val$payload;
                {
                    this.val$payload = string;
                    this.topic = assets.getMethod();
                    this.content = this.val$payload.getBytes(Charset.UTF_8);
                }

                public String topic() {
                    return this.topic;
                }

                public byte[] content() {
                    return this.content;
                }
            };
            return Mono.fromRunnable(() -> this.producer.send(message)).subscribeOn(Schedulers.fromExecutor((Executor)this.mqExecutor)).timeout(Duration.ofMillis(assets.getTimeout())).thenReturn(payload);
        }).flatMap(payload -> {
            long duration = System.currentTimeMillis() - startTime;
            Format.info(request.exchange(), "MQ_ROUTE_SUCCESS", "Method: " + assets.getMethod() + ", Duration: " + duration + "ms");
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue((Object)"Request forwarded to MQ");
        }).doOnTerminate(() -> {
            long duration = System.currentTimeMillis() - startTime;
            Format.info(request.exchange(), "MQ_ROUTE_COMPLETE", "Method: " + assets.getMethod() + ", Duration: " + duration + "ms");
        }).onErrorResume(e -> {
            long duration = System.currentTimeMillis() - startTime;
            Format.error(request.exchange(), "MQ_ROUTE_ERROR", "Method: " + assets.getMethod() + ", Duration: " + duration + "ms, Error: " + e.getMessage());
            return ServerResponse.status((int)500).contentType(MediaType.APPLICATION_JSON).bodyValue((Object)("{\"error\":\"Failed to forward request to MQ: " + e.getMessage() + "\"}"));
        });
    }
}

