/*
 * Decompiled with CFR 0.152.
 */
package org.glassfish.jersey.microprofile.restclient;

import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.sse.InboundSseEvent;
import org.glassfish.jersey.internal.PropertiesDelegate;
import org.glassfish.jersey.internal.util.JerseyPublisher;
import org.glassfish.jersey.media.sse.EventInput;
import org.glassfish.jersey.media.sse.InboundEvent;
import org.glassfish.jersey.message.MessageBodyWorkers;
import org.glassfish.jersey.microprofile.restclient.SseEventSuscriber;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class SseEventPublisher
extends EventInput
implements Publisher<InboundEvent> {
    private final Executor executor;
    private final Type genericType;
    private final JerseyPublisher<Object> publisher;
    private static final Logger LOG = Logger.getLogger(SseEventPublisher.class.getName());

    SseEventPublisher(InputStream inputStream, Type genericType, Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, String> headers, MessageBodyWorkers messageBodyWorkers, PropertiesDelegate propertiesDelegate, ExecutorService executor) {
        super(inputStream, annotations, mediaType, headers, messageBodyWorkers, propertiesDelegate);
        this.executor = executor;
        this.genericType = genericType;
        this.publisher = new JerseyPublisher(executor::submit, JerseyPublisher.PublisherStrategy.BLOCKING);
    }

    public void subscribe(Subscriber subscriber) {
        if (subscriber == null) {
            throw new NullPointerException("The subscriber is `null`");
        }
        this.publisher.subscribe(new SseEventSuscriber(subscriber));
        Runnable readEventTask = () -> {
            if (this.genericType instanceof ParameterizedType) {
                Type typeArgument = ((ParameterizedType)this.genericType).getActualTypeArguments()[0];
                SseEventPublisher input = this;
                try {
                    if (typeArgument.equals(InboundSseEvent.class)) {
                        InboundSseEvent event;
                        while ((event = (InboundSseEvent)input.read()) != null) {
                            this.publisher.publish((Object)event);
                        }
                    } else {
                        InboundSseEvent event;
                        while ((event = (InboundSseEvent)input.read()) != null) {
                            this.publisher.publish(event.readData((Class)typeArgument));
                        }
                    }
                }
                catch (Throwable t) {
                    subscriber.onError(t);
                    return;
                }
                this.publisher.close();
            }
        };
        try {
            this.executor.execute(readEventTask);
        }
        catch (RejectedExecutionException ex) {
            LOG.log(Level.WARNING, "Executor {0} rejected emit event task", this.executor);
        }
    }
}

