package io.scalecube.gateway.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.scalecube.gateway.GatewayMetrics;
import io.scalecube.gateway.ReferenceCountUtil;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ErrorData;
import io.scalecube.services.api.Qualifier;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.codec.DataCodec;
import io.scalecube.services.exceptions.ExceptionProcessor;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.server.HttpServerRequest;
import reactor.ipc.netty.http.server.HttpServerResponse;

/* loaded from: input_file:io/scalecube/gateway/http/GatewayHttpAcceptor.class */
public class GatewayHttpAcceptor implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(GatewayHttpAcceptor.class);
    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10);
    private final ServiceCall serviceCall;
    private final GatewayMetrics metrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    public GatewayHttpAcceptor(ServiceCall serviceCall, GatewayMetrics gatewayMetrics) {
        this.serviceCall = serviceCall;
        this.metrics = gatewayMetrics;
    }

    @Override // java.util.function.BiFunction
    public Publisher<Void> apply(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse) {
        LOGGER.debug("Accepted request: {}, headers: {}, params: {}", new Object[]{httpServerRequest, httpServerRequest.requestHeaders(), httpServerRequest.params()});
        if (httpServerRequest.method() != HttpMethod.POST) {
            LOGGER.error("Unsupported HTTP method. Expected POST, actual {}", httpServerRequest.method());
            return methodNotAllowed(httpServerResponse);
        }
        Mono flatMap = httpServerRequest.receive().aggregate().map((v0) -> {
            return v0.retain();
        }).doOnNext(byteBuf -> {
            this.metrics.markRequest();
        }).flatMap(byteBuf2 -> {
            return handleRequest(byteBuf2, httpServerRequest, httpServerResponse);
        });
        GatewayMetrics gatewayMetrics = this.metrics;
        gatewayMetrics.getClass();
        return flatMap.doOnTerminate(gatewayMetrics::markResponse).timeout(DEFAULT_TIMEOUT).onErrorResume(th -> {
            return error(httpServerResponse, ExceptionProcessor.toMessage(th));
        });
    }

    private Mono<Void> handleRequest(ByteBuf byteBuf, HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse) {
        ServiceMessage.Builder qualifier = ServiceMessage.builder().header("gw-recv-from-client-time", String.valueOf(System.currentTimeMillis())).qualifier(httpServerRequest.uri());
        return this.serviceCall.requestOne(qualifier.data(byteBuf).build()).switchIfEmpty(Mono.defer(() -> {
            return Mono.just(qualifier.data((Object) null).build());
        })).flatMap(serviceMessage -> {
            enrichResponse(httpServerRequest, httpServerResponse, serviceMessage);
            return Mono.defer(() -> {
                return ExceptionProcessor.isError(serviceMessage) ? error(httpServerResponse, serviceMessage) : serviceMessage.hasData() ? ok(httpServerResponse, serviceMessage) : noContent(httpServerResponse);
            });
        });
    }

    private Publisher<Void> methodNotAllowed(HttpServerResponse httpServerResponse) {
        return httpServerResponse.addHeader(HttpHeaderNames.ALLOW, HttpMethod.POST.name()).status(HttpResponseStatus.METHOD_NOT_ALLOWED).send();
    }

    private Mono<Void> error(HttpServerResponse httpServerResponse, ServiceMessage serviceMessage) {
        return httpServerResponse.status(HttpResponseStatus.valueOf(Integer.parseInt(Qualifier.getQualifierAction(serviceMessage.qualifier())))).sendObject(serviceMessage.hasData(ErrorData.class) ? encodeData(serviceMessage.data(), serviceMessage.dataFormatOrDefault()) : (ByteBuf) serviceMessage.data()).then();
    }

    private Mono<Void> noContent(HttpServerResponse httpServerResponse) {
        return httpServerResponse.status(HttpResponseStatus.NO_CONTENT).send();
    }

    private Mono<Void> ok(HttpServerResponse httpServerResponse, ServiceMessage serviceMessage) {
        return httpServerResponse.status(HttpResponseStatus.OK).sendObject(serviceMessage.hasData(ByteBuf.class) ? (ByteBuf) serviceMessage.data() : encodeData(serviceMessage.data(), serviceMessage.dataFormatOrDefault())).then();
    }

    private ByteBuf encodeData(Object obj, String str) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        try {
            DataCodec.getInstance(str).encode(new ByteBufOutputStream(buffer), obj);
            return buffer;
        } catch (IOException e) {
            ReferenceCountUtil.safestRelease(buffer);
            LOGGER.error("Failed to encode data: {}", obj, e);
            return Unpooled.EMPTY_BUFFER;
        }
    }

    private void enrichResponse(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse, ServiceMessage serviceMessage) {
        Optional.ofNullable(httpServerRequest.requestHeaders().get("client-send-time")).ifPresent(str -> {
            httpServerResponse.header("client-send-time", str);
        });
        Optional.ofNullable(serviceMessage.header("service-recv-time")).ifPresent(str2 -> {
            httpServerResponse.header("service-recv-time", str2);
        });
        Optional.ofNullable(serviceMessage.header("gw-recv-from-client-time")).ifPresent(str3 -> {
            httpServerResponse.header("gw-recv-from-client-time", str3);
        });
        httpServerResponse.header("gw-recv-from-service-time", String.valueOf(System.currentTimeMillis()));
    }
}
