package com.sandpolis.core.instance.state.st.entangled;

import com.google.common.eventbus.Subscribe;
import com.sandpolis.core.instance.Messages;
import com.sandpolis.core.instance.state.STCmd;
import com.sandpolis.core.instance.state.oid.Oid;
import com.sandpolis.core.instance.state.st.AbstractSTObject;
import com.sandpolis.core.instance.state.st.STAttribute;
import com.sandpolis.core.instance.state.st.STDocument;
import com.sandpolis.core.instance.state.st.STObject;
import com.sandpolis.core.instance.stream.InboundStreamAdapter;
import com.sandpolis.core.instance.stream.OutboundStreamAdapter;
import com.sandpolis.core.instance.stream.StreamSink;
import com.sandpolis.core.instance.stream.StreamSource;
import com.sandpolis.core.instance.stream.StreamStore;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sandpolis/core/instance/state/st/entangled/EntangledObject.class */
public abstract class EntangledObject extends AbstractSTObject {
    private static final Logger log = LoggerFactory.getLogger(EntangledObject.class);
    protected StreamSink<Messages.EV_STStreamData> sink;
    protected StreamSource<Messages.EV_STStreamData> source;
    private final CompletableFuture<Void> inactive;
    protected STObject container;

    public EntangledObject(STDocument sTDocument, String str) {
        super(sTDocument, str);
        this.inactive = new CompletableFuture<>();
    }

    public StreamSink<Messages.EV_STStreamData> getSink() {
        return this.sink;
    }

    public StreamSource<Messages.EV_STStreamData> getSource() {
        return this.source;
    }

    public boolean isActive() {
        return !this.inactive.isDone();
    }

    public CompletableFuture<Void> getInactiveFuture() {
        return this.inactive;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startSink(STCmd.STSyncStruct sTSyncStruct) {
        this.sink = new StreamSink<Messages.EV_STStreamData>() { // from class: com.sandpolis.core.instance.state.st.entangled.EntangledObject.1
            @Override // com.sandpolis.core.instance.stream.StreamSink, java.util.concurrent.Flow.Subscriber
            public void onNext(Messages.EV_STStreamData eV_STStreamData) {
                EntangledObject.this.container.merge(eV_STStreamData);
            }

            @Override // com.sandpolis.core.instance.stream.StreamSink, com.sandpolis.core.instance.stream.StreamEndpoint
            public void close() {
                EntangledObject.this.inactive.complete(null);
            }
        };
        StreamStore.StreamStore.add(new InboundStreamAdapter(sTSyncStruct.streamId, sTSyncStruct.connection, Messages.EV_STStreamData.class), this.sink);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startSource(STCmd.STSyncStruct sTSyncStruct) {
        this.source = new StreamSource<Messages.EV_STStreamData>() { // from class: com.sandpolis.core.instance.state.st.entangled.EntangledObject.2
            @Override // com.sandpolis.core.instance.stream.StreamSource
            public void start() {
                EntangledObject.this.container.addListener(EntangledObject.this);
            }

            @Override // java.util.concurrent.SubmissionPublisher, java.lang.AutoCloseable, com.sandpolis.core.instance.stream.StreamEndpoint
            public void close() {
                EntangledObject.this.container.removeListener(EntangledObject.this);
                EntangledObject.this.inactive.complete(null);
            }

            @Override // com.sandpolis.core.instance.stream.StreamEndpoint.StreamPublisher
            public String getStreamKey() {
                return EntangledObject.this.container.oid().toString();
            }
        };
        StreamStore.StreamStore.add(this.source, new OutboundStreamAdapter(sTSyncStruct.streamId, sTSyncStruct.connection));
        this.source.start();
        Stream<Messages.EV_STStreamData> snapshot = this.container.snapshot(sTSnapshotStruct -> {
            sTSnapshotStruct.oid = this.container.oid();
        });
        StreamSource<Messages.EV_STStreamData> streamSource = this.source;
        Objects.requireNonNull(streamSource);
        snapshot.forEach((v1) -> {
            r1.submit(v1);
        });
        if (sTSyncStruct.permanent) {
            return;
        }
        log.debug("Requested stream was not permanent");
        close();
    }

    public void close() {
        if (this.source != null) {
            StreamStore.StreamStore.stop(this.source.getStreamID());
        }
        if (this.sink != null) {
            StreamStore.StreamStore.stop(this.sink.getStreamID());
        }
    }

    @Override // com.sandpolis.core.instance.state.st.AbstractSTObject, com.sandpolis.core.instance.state.st.STObject
    public Oid oid() {
        return this.container.oid();
    }

    @Override // com.sandpolis.core.instance.state.st.AbstractSTObject, com.sandpolis.core.instance.state.st.STObject
    public STDocument parent() {
        return this.container.parent();
    }

    @Override // com.sandpolis.core.instance.state.st.AbstractSTObject, com.sandpolis.core.instance.state.st.STObject
    public void addListener(Object obj) {
        this.container.addListener(obj);
    }

    @Override // com.sandpolis.core.instance.state.st.AbstractSTObject, com.sandpolis.core.instance.state.st.STObject
    public void removeListener(Object obj) {
        this.container.removeListener(obj);
    }

    @Subscribe
    void handle(STAttribute.ChangeEvent changeEvent) {
        Stream<Messages.EV_STStreamData> snapshot = changeEvent.attribute().snapshot(sTSnapshotStruct -> {
            sTSnapshotStruct.oid = this.container.oid();
        });
        StreamSource<Messages.EV_STStreamData> streamSource = this.source;
        Objects.requireNonNull(streamSource);
        snapshot.forEach((v1) -> {
            r1.submit(v1);
        });
    }

    @Subscribe
    void handle(STDocument.DocumentAddedEvent documentAddedEvent) {
        Stream<Messages.EV_STStreamData> snapshot = documentAddedEvent.newDocument().snapshot(sTSnapshotStruct -> {
            sTSnapshotStruct.oid = this.container.oid();
        });
        StreamSource<Messages.EV_STStreamData> streamSource = this.source;
        Objects.requireNonNull(streamSource);
        snapshot.forEach((v1) -> {
            r1.submit(v1);
        });
    }

    @Subscribe
    void handle(STDocument.DocumentRemovedEvent documentRemovedEvent) {
        this.source.submit((Messages.EV_STStreamData) Messages.EV_STStreamData.newBuilder().setRemoved(true).setOid((String) Arrays.stream(documentRemovedEvent.oldDocument().oid().path()).skip(this.container.oid().path().length).collect(Collectors.joining("/"))).build());
    }

    @Override // com.sandpolis.core.instance.state.st.AbstractSTObject, com.sandpolis.core.instance.state.st.STObject
    public void replaceParent(STDocument sTDocument) {
        this.container.replaceParent(sTDocument);
    }
}
