package kr.jm.utils.flow.processor;

import java.util.concurrent.Flow;
import java.util.function.BiConsumer;
import kr.jm.utils.flow.TransformerInterface;
import kr.jm.utils.flow.publisher.JMSubmissionPublisher;
import kr.jm.utils.flow.subscriber.JMSubscriber;
import kr.jm.utils.flow.subscriber.JMSubscriberBuilder;
import kr.jm.utils.helper.JMLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kr/jm/utils/flow/processor/JMTransformProcessor.class */
public class JMTransformProcessor<T, R> implements JMTransformProcessorInterface<T, R> {
    protected final Logger log;
    private JMSubscriber<T> inputSubscriber;
    private JMSubmissionPublisher<R> outputPublisher;

    public JMTransformProcessor(TransformerInterface<T, R> transformerInterface) {
        this(getSingleInputPublisherBiConsumer(transformerInterface));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <I, O> BiConsumer<I, JMSubmissionPublisher<? super O>> getSingleInputPublisherBiConsumer(TransformerInterface<I, O> transformerInterface) {
        return (obj, jMSubmissionPublisher) -> {
            jMSubmissionPublisher.submit(transformerInterface.transform(obj));
        };
    }

    public JMTransformProcessor(BiConsumer<T, JMSubmissionPublisher<? super R>> biConsumer) {
        this.log = LoggerFactory.getLogger(getClass());
        this.outputPublisher = new JMSubmissionPublisher<>();
        this.inputSubscriber = JMSubscriberBuilder.build(obj -> {
            biConsumer.accept(obj, this.outputPublisher);
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        JMLog.info(this.log, "onSubscribe", new Object[]{subscription});
        this.inputSubscriber.onSubscribe(subscription);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(T t) {
        JMLog.debug(this.log, "onNext", new Object[]{t});
        this.inputSubscriber.onNext(t);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        this.inputSubscriber.onError(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        JMLog.info(this.log, "onComplete");
        this.inputSubscriber.onComplete();
    }

    public void subscribe(Flow.Subscriber<? super R> subscriber) {
        JMLog.info(this.log, "subscribeWith", new Object[]{subscriber});
        this.outputPublisher.subscribe(subscriber);
    }
}
