/*
 * Decompiled with CFR 0.152.
 */
package org.streamingpool.core.service.streamfactory;

import io.reactivex.Flowable;
import java.util.function.Function;
import org.streamingpool.core.domain.ErrorDeflector;
import org.streamingpool.core.domain.ErrorStreamPair;
import org.streamingpool.core.service.DiscoveryService;
import org.streamingpool.core.service.StreamFactory;
import org.streamingpool.core.service.StreamId;
import org.streamingpool.core.service.streamid.DerivedStreamId;

public class DerivedStreamFactory
implements StreamFactory {
    @Override
    public <T> ErrorStreamPair<T> create(StreamId<T> id, DiscoveryService discoveryService) {
        if (!(id instanceof DerivedStreamId)) {
            return ErrorStreamPair.empty();
        }
        DerivedStreamId derivedStreamId = (DerivedStreamId)id;
        return this.createDerivedStream(derivedStreamId, discoveryService);
    }

    private <S, T> ErrorStreamPair<T> createDerivedStream(DerivedStreamId<S, T> id, DiscoveryService discoveryService) {
        Flowable sourceStream = Flowable.fromPublisher(discoveryService.discover(id.sourceStreamId()));
        Function<S, T> conversion = id.conversion();
        ErrorDeflector ed = ErrorDeflector.create();
        return ed.streamNonEmpty(sourceStream.map(ed.emptyOnException(conversion)));
    }
}

