package com.sandpolis.core.instance.stream;

import com.google.common.base.Preconditions;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import com.sandpolis.core.instance.Messages;
import com.sandpolis.core.instance.connection.Connection;
import com.sandpolis.core.instance.state.InstanceOids;
import com.sandpolis.core.instance.stream.StreamEndpoint;
import com.sandpolis.core.instance.util.S7SMsg;
import java.util.concurrent.Flow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sandpolis/core/instance/stream/OutboundStreamAdapter.class */
public class OutboundStreamAdapter<E extends MessageLiteOrBuilder> implements Flow.Subscriber<E>, StreamEndpoint.StreamSubscriber<E> {
    private static final Logger log = LoggerFactory.getLogger(OutboundStreamAdapter.class);
    private final int sid;
    private final int id;
    private final Connection connection;
    private Flow.Subscription subscription;

    public OutboundStreamAdapter(int i, Connection connection) {
        this.id = i;
        this.connection = (Connection) Preconditions.checkNotNull(connection);
        this.sid = this.connection.get(InstanceOids.ConnectionOid.REMOTE_SID).asInt();
    }

    public OutboundStreamAdapter(int i, Connection connection, int i2) {
        this.id = i;
        this.connection = (Connection) Preconditions.checkNotNull(connection);
        this.sid = i2;
    }

    public Connection getSock() {
        return this.connection;
    }

    @Override // com.sandpolis.core.instance.stream.StreamEndpoint
    public int getStreamID() {
        return this.id;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        log.debug("onComplete");
        StreamStore.StreamStore.stop(this.id);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        StreamStore.StreamStore.stop(this.id);
        log.error("Publish or subscription failure", th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(E e) {
        this.connection.send(S7SMsg.ev(this.id).pack(e).setTo(this.sid));
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(Long.MAX_VALUE);
    }

    @Override // com.sandpolis.core.instance.stream.StreamEndpoint
    public void close() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (this.connection.channel().isActive()) {
            log.debug("Sending stream closed event");
            this.connection.send(S7SMsg.ev(this.id).pack((MessageLite.Builder) Messages.RQ_StopStream.newBuilder()).setTo(this.sid));
        }
    }
}
