/*
 * Decompiled with CFR 0.152.
 */
package pl.gsmservice.gateway.utils.reactive;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import pl.gsmservice.gateway.utils.AsyncResponse;
import pl.gsmservice.gateway.utils.Blob;
import pl.gsmservice.gateway.utils.EventStreamMessage;
import pl.gsmservice.gateway.utils.StreamingParser;
import pl.gsmservice.gateway.utils.Utils;

public class EventStream<ResponseT extends AsyncResponse, ItemT>
implements Publisher<ItemT> {
    private final CompletableFuture<ResponseT> asyncResponseFuture;
    private final TypeReference<ItemT> typeReference;
    private final ObjectMapper objectMapper;
    private final Protocol<?, ItemT> protocol;

    private EventStream(CompletableFuture<ResponseT> asyncResponseFuture, TypeReference<ItemT> typeReference, ObjectMapper objectMapper, Protocol<?, ItemT> protocol) {
        this.asyncResponseFuture = asyncResponseFuture;
        this.typeReference = typeReference;
        this.objectMapper = objectMapper;
        this.protocol = protocol;
    }

    public static <ResponseT extends AsyncResponse, ItemT> EventStream<ResponseT, ItemT> forSSE(CompletableFuture<ResponseT> asyncResponseFuture, TypeReference<ItemT> typeReference, ObjectMapper objectMapper, String terminalMessage) {
        return new EventStream<ResponseT, ItemT>(asyncResponseFuture, typeReference, objectMapper, new SSEProtocol(terminalMessage));
    }

    public static <ResponseT extends AsyncResponse, ItemT> EventStream<ResponseT, ItemT> forJsonL(CompletableFuture<ResponseT> asyncResponseFuture, TypeReference<ItemT> typeReference, ObjectMapper objectMapper) {
        return new EventStream<ResponseT, ItemT>(asyncResponseFuture, typeReference, objectMapper, new JsonLProtocol());
    }

    public CompletableFuture<String> contentType() {
        return this.asyncResponseFuture.thenApply(AsyncResponse::contentType);
    }

    public CompletableFuture<Integer> statusCode() {
        return this.asyncResponseFuture.thenApply(AsyncResponse::statusCode);
    }

    public CompletableFuture<HttpResponse<Blob>> rawResponse() {
        return this.asyncResponseFuture.thenApply(AsyncResponse::rawResponse);
    }

    public CompletableFuture<ResponseT> body() {
        return this.asyncResponseFuture;
    }

    public void subscribe(Subscriber<? super ItemT> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException("Subscriber cannot be null");
        }
        EventStreamSubscription subscription = new EventStreamSubscription(subscriber);
        subscriber.onSubscribe((Subscription)subscription);
        subscription.start(this.rawResponse());
    }

    private static class JsonLProtocol<ItemT>
    implements Protocol<String, ItemT> {
        private JsonLProtocol() {
        }

        @Override
        public StreamingParser<String> createParser() {
            return StreamingParser.forJsonLines();
        }

        @Override
        public ItemT processItem(String jsonLine, ObjectMapper objectMapper, TypeReference<ItemT> typeReference) throws Exception {
            return (ItemT)objectMapper.readValue(jsonLine, typeReference);
        }

        @Override
        public boolean shouldStop(String jsonLine) {
            return false;
        }
    }

    private static class SSEProtocol<ItemT>
    implements Protocol<EventStreamMessage, ItemT> {
        private final String terminalMessage;

        public SSEProtocol(String terminalMessage) {
            this.terminalMessage = terminalMessage;
        }

        @Override
        public StreamingParser<EventStreamMessage> createParser() {
            return StreamingParser.forSSE();
        }

        @Override
        public ItemT processItem(EventStreamMessage message, ObjectMapper objectMapper, TypeReference<ItemT> typeReference) {
            if (message.data().isEmpty()) {
                return null;
            }
            return Utils.asType(message, objectMapper, typeReference);
        }

        @Override
        public boolean shouldStop(EventStreamMessage message) {
            return this.terminalMessage != null && this.terminalMessage.equals(message.data());
        }
    }

    private class EventStreamSubscription
    implements Subscription {
        private final Subscriber<? super ItemT> subscriber;
        private final AtomicLong demand = new AtomicLong(0L);
        private final StreamingParser<?> parser;
        private Flow.Subscription upstreamSubscription;
        private volatile boolean cancelled = false;
        private volatile boolean completed = false;

        public EventStreamSubscription(Subscriber<? super ItemT> subscriber) {
            this.subscriber = subscriber;
            this.parser = EventStream.this.protocol.createParser();
        }

        public void start(CompletableFuture<HttpResponse<Blob>> httpResponseFuture) {
            httpResponseFuture.whenComplete((httpResponse, throwable) -> {
                Flow.Publisher<ByteBuffer> flowPublisher;
                if (this.cancelled) {
                    return;
                }
                if (throwable != null) {
                    this.signalError((Throwable)throwable);
                    return;
                }
                Blob blob = (Blob)httpResponse.body();
                try {
                    flowPublisher = blob.asPublisher();
                }
                catch (Exception e) {
                    this.signalError(e);
                    return;
                }
                flowPublisher.subscribe(new Flow.Subscriber<ByteBuffer>(){

                    @Override
                    public void onSubscribe(Flow.Subscription subscription) {
                        if (EventStreamSubscription.this.cancelled) {
                            subscription.cancel();
                            return;
                        }
                        EventStreamSubscription.this.upstreamSubscription = subscription;
                        EventStreamSubscription.this.requestMoreIfNeeded();
                    }

                    @Override
                    public void onNext(ByteBuffer byteBuffer) {
                        if (EventStreamSubscription.this.cancelled || EventStreamSubscription.this.completed) {
                            return;
                        }
                        try {
                            EventStreamSubscription.this.processBuffer(byteBuffer);
                            if (!EventStreamSubscription.this.completed) {
                                EventStreamSubscription.this.requestMoreIfNeeded();
                            }
                        }
                        catch (Exception e) {
                            EventStreamSubscription.this.signalError(e);
                        }
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        EventStreamSubscription.this.signalError(throwable);
                    }

                    @Override
                    public void onComplete() {
                        try {
                            EventStreamSubscription.this.processEndOfStream();
                            EventStreamSubscription.this.signalComplete();
                        }
                        catch (Exception e) {
                            EventStreamSubscription.this.signalError(e);
                        }
                    }
                });
            });
        }

        public void request(long n) {
            if (n <= 0L) {
                this.signalError(new IllegalArgumentException("Request amount must be positive"));
                return;
            }
            if (this.cancelled || this.completed) {
                return;
            }
            this.demand.addAndGet(n);
            this.requestMoreIfNeeded();
        }

        public void cancel() {
            if (!this.cancelled) {
                this.cancelled = true;
                if (this.upstreamSubscription != null) {
                    this.upstreamSubscription.cancel();
                }
            }
        }

        private void processBuffer(ByteBuffer byteBuffer) {
            Optional<?> parsedOpt = this.parser.add(byteBuffer);
            while (parsedOpt.isPresent()) {
                if (!this.processItem(parsedOpt.get())) {
                    return;
                }
                parsedOpt = this.parser.next();
            }
        }

        private boolean processItem(Object parsed) {
            Protocol typedProtocol = EventStream.this.protocol;
            if (typedProtocol.shouldStop(parsed)) {
                this.signalComplete();
                return false;
            }
            if (this.demand.get() > 0L) {
                try {
                    Object item = typedProtocol.processItem(parsed, EventStream.this.objectMapper, EventStream.this.typeReference);
                    if (item != null) {
                        this.demand.decrementAndGet();
                        this.subscriber.onNext(item);
                    }
                }
                catch (Exception e) {
                    this.signalError(e);
                    return false;
                }
            }
            return true;
        }

        private void requestMoreIfNeeded() {
            if (this.cancelled || this.completed) {
                return;
            }
            if (this.upstreamSubscription != null && this.demand.get() > 0L) {
                this.upstreamSubscription.request(1L);
            }
        }

        private void processEndOfStream() {
            Optional<?> parsedOpt = this.parser.finish();
            parsedOpt.ifPresent(this::processItem);
        }

        private void signalError(Throwable t) {
            if (!this.cancelled && !this.completed) {
                this.completed = true;
                this.subscriber.onError(t);
            }
        }

        private void signalComplete() {
            if (!this.cancelled && !this.completed) {
                this.completed = true;
                this.subscriber.onComplete();
            }
        }
    }

    public static interface Protocol<ParsedT, ItemT> {
        public StreamingParser<ParsedT> createParser();

        public ItemT processItem(ParsedT var1, ObjectMapper var2, TypeReference<ItemT> var3) throws Exception;

        public boolean shouldStop(ParsedT var1);
    }
}

