package com.twitter.heron.simulator.instance;

import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
import com.twitter.heron.proto.system.HeronTuples;

/* loaded from: input_file:com/twitter/heron/simulator/instance/OutgoingTupleCollection.class */
public class OutgoingTupleCollection {
    protected final PhysicalPlanHelper helper;
    private final Communicator<HeronTuples.HeronTupleSet> outQueue;
    private HeronTuples.HeronDataTupleSet.Builder currentDataTuple;
    private HeronTuples.HeronControlTupleSet.Builder currentControlTuple;
    private final SystemConfig systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
    private long totalDataEmittedInBytes = 0;
    private long currentDataTupleSizeInBytes = 0;
    private int dataTupleSetCapacity = this.systemConfig.getInstanceSetDataTupleCapacity();
    private long maxDataTupleSizeInBytes = this.systemConfig.getInstanceSetDataTupleSizeBytes();
    private int controlTupleSetCapacity = this.systemConfig.getInstanceSetControlTupleCapacity();

    public OutgoingTupleCollection(PhysicalPlanHelper physicalPlanHelper, Communicator<HeronTuples.HeronTupleSet> communicator) {
        this.outQueue = communicator;
        this.helper = physicalPlanHelper;
    }

    public void sendOutTuples() {
        flushRemaining();
    }

    public void addDataTuple(String str, HeronTuples.HeronDataTuple.Builder builder, long j) {
        if (this.currentDataTuple == null || !this.currentDataTuple.getStream().getId().equals(str) || this.currentDataTuple.getTuplesCount() >= this.dataTupleSetCapacity || this.currentDataTupleSizeInBytes >= this.maxDataTupleSizeInBytes) {
            initNewDataTuple(str);
        }
        this.currentDataTuple.addTuples(builder);
        this.currentDataTupleSizeInBytes += j;
        this.totalDataEmittedInBytes += j;
    }

    public void addAckTuple(HeronTuples.AckTuple.Builder builder, long j) {
        if (this.currentControlTuple == null || this.currentControlTuple.getFailsCount() > 0 || this.currentControlTuple.getAcksCount() >= this.controlTupleSetCapacity) {
            initNewControlTuple();
        }
        this.currentControlTuple.addAcks(builder);
        this.totalDataEmittedInBytes += j;
    }

    public void addFailTuple(HeronTuples.AckTuple.Builder builder, long j) {
        if (this.currentControlTuple == null || this.currentControlTuple.getAcksCount() > 0 || this.currentControlTuple.getFailsCount() >= this.controlTupleSetCapacity) {
            initNewControlTuple();
        }
        this.currentControlTuple.addFails(builder);
        this.totalDataEmittedInBytes += j;
    }

    private void initNewDataTuple(String str) {
        flushRemaining();
        this.currentDataTupleSizeInBytes = 0L;
        TopologyAPI.StreamId.Builder newBuilder = TopologyAPI.StreamId.newBuilder();
        newBuilder.setId(str);
        newBuilder.setComponentName(this.helper.getMyComponent());
        this.currentDataTuple = HeronTuples.HeronDataTupleSet.newBuilder();
        this.currentDataTuple.setStream(newBuilder);
    }

    private void initNewControlTuple() {
        flushRemaining();
        this.currentControlTuple = HeronTuples.HeronControlTupleSet.newBuilder();
    }

    private void flushRemaining() {
        if (this.currentDataTuple != null) {
            HeronTuples.HeronTupleSet.Builder newBuilder = HeronTuples.HeronTupleSet.newBuilder();
            newBuilder.setData(this.currentDataTuple);
            pushTupleToQueue(newBuilder, this.outQueue);
            this.currentDataTuple = null;
        }
        if (this.currentControlTuple != null) {
            HeronTuples.HeronTupleSet.Builder newBuilder2 = HeronTuples.HeronTupleSet.newBuilder();
            newBuilder2.setControl(this.currentControlTuple);
            pushTupleToQueue(newBuilder2, this.outQueue);
            this.currentControlTuple = null;
        }
    }

    private void pushTupleToQueue(HeronTuples.HeronTupleSet.Builder builder, Communicator<HeronTuples.HeronTupleSet> communicator) {
        communicator.offer(builder.build());
    }

    public boolean isOutQueuesAvailable() {
        return this.outQueue.size() < this.outQueue.getExpectedAvailableCapacity();
    }

    public long getTotalDataEmittedInBytes() {
        return this.totalDataEmittedInBytes;
    }

    public void clear() {
        this.currentControlTuple = null;
        this.currentDataTuple = null;
        this.outQueue.clear();
    }
}
