/*
 * Decompiled with CFR 0.152.
 */
package org.noear.dami2.bus.receivable;

import java.util.function.Consumer;
import org.noear.dami2.bus.DamiBusExtension;
import org.noear.dami2.bus.Event;
import org.noear.dami2.bus.receivable.StreamEventHandler;
import org.noear.dami2.bus.receivable.StreamPayload;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public interface StreamBusExtension
extends DamiBusExtension {
    default public <D, R> Publisher<R> stream(String topic, D data) {
        return this.stream(topic, data, null);
    }

    default public <D, R> Publisher<R> stream(String topic, D data, Consumer<Subscriber<? super R>> fallback) {
        if (fallback == null) {
            return subscriber -> this.bus().send(topic, new StreamPayload(data, subscriber));
        }
        return subscriber -> this.bus().send(topic, new StreamPayload(data, subscriber), r -> fallback.accept(r.getSink()));
    }

    default public <D, R> void listen(String topic, StreamEventHandler<D, R> handler) {
        this.listen(topic, 0, handler);
    }

    default public <D, R> void listen(String topic, int index, StreamEventHandler<D, R> handler) {
        this.bus().listen(topic, index, (Event<P> event) -> handler.onStream(event, event.getAttach(), ((StreamPayload)event.getPayload()).getData(), ((StreamPayload)event.getPayload()).getSink()));
    }
}

