package io.servicetalk.http.api;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.BlockingUtils;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.utils.internal.NumberUtils;
import io.servicetalk.utils.internal.ThrowableUtils;
import java.io.IOException;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:io/servicetalk/http/api/StreamingHttpServiceToBlockingStreamingHttpService.class */
final class StreamingHttpServiceToBlockingStreamingHttpService implements BlockingStreamingHttpService {
    private final StreamingHttpService original;
    private final int demandBatchSize;

    /* loaded from: input_file:io/servicetalk/http/api/StreamingHttpServiceToBlockingStreamingHttpService$MessageBodyToPayloadWriter.class */
    private static final class MessageBodyToPayloadWriter extends SubscribableCompletable {
        private final Publisher<Object> messageBody;
        private final HttpPayloadWriter<Buffer> payloadWriter;
        private final int demandBatchSize;

        /* loaded from: input_file:io/servicetalk/http/api/StreamingHttpServiceToBlockingStreamingHttpService$MessageBodyToPayloadWriter$PayloadPump.class */
        private static final class PayloadPump implements PublisherSource.Subscriber<Object> {
            private final CompletableSource.Subscriber subscriber;
            private final HttpPayloadWriter<Buffer> payloadWriter;

            @Nullable
            private PublisherSource.Subscription subscription;
            private final int demandBatchSize;
            private int itemsToNextRequest;
            static final /* synthetic */ boolean $assertionsDisabled;

            PayloadPump(CompletableSource.Subscriber subscriber, HttpPayloadWriter<Buffer> httpPayloadWriter, int i) {
                this.subscriber = subscriber;
                this.payloadWriter = httpPayloadWriter;
                this.demandBatchSize = i;
            }

            public void onSubscribe(PublisherSource.Subscription subscription) {
                this.subscription = ConcurrentSubscription.wrap(subscription);
                this.subscriber.onSubscribe(this.subscription);
                this.itemsToNextRequest = this.demandBatchSize;
                this.subscription.request(this.itemsToNextRequest);
            }

            public void onNext(@Nullable Object obj) {
                if (obj instanceof Buffer) {
                    try {
                        this.payloadWriter.write((Buffer) obj);
                    } catch (IOException e) {
                        ThrowableUtils.throwException(e);
                    }
                } else {
                    if (!(obj instanceof HttpHeaders)) {
                        throw new IllegalArgumentException("unsupported type: " + obj);
                    }
                    this.payloadWriter.setTrailers((HttpHeaders) obj);
                }
                requestMoreIfRequired();
            }

            public void onError(Throwable th) {
                try {
                    this.payloadWriter.close(th);
                    this.subscriber.onError(th);
                } catch (Throwable th2) {
                    this.subscriber.onError(th2);
                }
            }

            public void onComplete() {
                try {
                    this.payloadWriter.close();
                    this.subscriber.onComplete();
                } catch (Throwable th) {
                    this.subscriber.onError(th);
                }
            }

            private void requestMoreIfRequired() {
                int i = this.itemsToNextRequest - 1;
                this.itemsToNextRequest = i;
                if (i == (this.demandBatchSize >>> 1)) {
                    int i2 = this.demandBatchSize - this.itemsToNextRequest;
                    this.itemsToNextRequest = this.demandBatchSize;
                    if (!$assertionsDisabled && this.subscription == null) {
                        throw new AssertionError();
                    }
                    this.subscription.request(i2);
                }
            }

            static {
                $assertionsDisabled = !StreamingHttpServiceToBlockingStreamingHttpService.class.desiredAssertionStatus();
            }
        }

        MessageBodyToPayloadWriter(Publisher<Object> publisher, HttpPayloadWriter<Buffer> httpPayloadWriter, int i) {
            this.messageBody = publisher;
            this.payloadWriter = httpPayloadWriter;
            this.demandBatchSize = i;
        }

        protected void handleSubscribe(CompletableSource.Subscriber subscriber) {
            SourceAdapters.toSource(this.messageBody).subscribe(new PayloadPump(subscriber, this.payloadWriter, this.demandBatchSize));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingHttpServiceToBlockingStreamingHttpService(StreamingHttpService streamingHttpService) {
        this(streamingHttpService, 64);
    }

    StreamingHttpServiceToBlockingStreamingHttpService(StreamingHttpService streamingHttpService, int i) {
        this.original = (StreamingHttpService) Objects.requireNonNull(streamingHttpService);
        this.demandBatchSize = NumberUtils.ensurePositive(i, "demandBatchSize");
    }

    @Override // io.servicetalk.http.api.BlockingStreamingHttpService
    public void handle(HttpServiceContext httpServiceContext, BlockingStreamingHttpRequest blockingStreamingHttpRequest, BlockingStreamingHttpServerResponse blockingStreamingHttpServerResponse) throws Exception {
        BlockingUtils.futureGetCancelOnInterrupt(handleBlockingRequest(httpServiceContext, blockingStreamingHttpRequest, blockingStreamingHttpServerResponse).toFuture());
    }

    @Nonnull
    private Completable handleBlockingRequest(HttpServiceContext httpServiceContext, BlockingStreamingHttpRequest blockingStreamingHttpRequest, BlockingStreamingHttpServerResponse blockingStreamingHttpServerResponse) {
        return this.original.handle(httpServiceContext, blockingStreamingHttpRequest.toStreamingRequest(), httpServiceContext.streamingResponseFactory()).flatMapCompletable(streamingHttpResponse -> {
            copyMeta(streamingHttpResponse, blockingStreamingHttpServerResponse);
            return new MessageBodyToPayloadWriter(streamingHttpResponse.messageBody(), blockingStreamingHttpServerResponse.sendMetaData(), this.demandBatchSize);
        });
    }

    private void copyMeta(StreamingHttpResponse streamingHttpResponse, BlockingStreamingHttpServerResponse blockingStreamingHttpServerResponse) {
        blockingStreamingHttpServerResponse.setHeaders(streamingHttpResponse.headers());
        blockingStreamingHttpServerResponse.status(streamingHttpResponse.status());
        blockingStreamingHttpServerResponse.version(streamingHttpResponse.version());
    }

    @Override // io.servicetalk.http.api.BlockingStreamingHttpService
    public void close() throws Exception {
        this.original.closeAsync().toFuture().get();
    }

    public void closeGracefully() throws Exception {
        this.original.closeAsyncGracefully().toFuture().get();
    }
}
