package com.twitter.heron.simulator.instance;

import com.google.protobuf.ByteString;
import com.twitter.heron.api.Config;
import com.twitter.heron.api.bolt.IOutputCollector;
import com.twitter.heron.api.serializer.IPluggableSerializer;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.utils.metrics.BoltMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
import com.twitter.heron.common.utils.tuple.TupleImpl;
import com.twitter.heron.proto.system.HeronTuples;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/twitter/heron/simulator/instance/BoltOutputCollectorImpl.class */
public class BoltOutputCollectorImpl implements IOutputCollector {
    private static final Logger LOG = Logger.getLogger(BoltOutputCollectorImpl.class.getName());
    private final IPluggableSerializer serializer;
    private final OutgoingTupleCollection outputter;
    private final BoltMetrics boltMetrics;
    private final PhysicalPlanHelper helper;
    private final boolean ackEnabled;

    public BoltOutputCollectorImpl(IPluggableSerializer iPluggableSerializer, PhysicalPlanHelper physicalPlanHelper, Communicator<HeronTuples.HeronTupleSet> communicator, BoltMetrics boltMetrics) {
        if (physicalPlanHelper.getMyBolt() == null) {
            throw new RuntimeException(physicalPlanHelper.getMyTaskId() + " is not a bolt ");
        }
        this.serializer = iPluggableSerializer;
        this.helper = physicalPlanHelper;
        this.boltMetrics = boltMetrics;
        Map<String, Object> topologyConfig = physicalPlanHelper.getTopologyContext().getTopologyConfig();
        if (!topologyConfig.containsKey(Config.TOPOLOGY_ENABLE_ACKING) || topologyConfig.get(Config.TOPOLOGY_ENABLE_ACKING) == null) {
            this.ackEnabled = false;
        } else {
            this.ackEnabled = Boolean.parseBoolean(topologyConfig.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
        }
        this.outputter = new OutgoingTupleCollection(physicalPlanHelper, communicator);
    }

    @Override // com.twitter.heron.api.bolt.IOutputCollector
    public List<Integer> emit(String str, Collection<Tuple> collection, List<Object> list) {
        return admitBoltTuple(str, collection, list);
    }

    @Override // com.twitter.heron.api.bolt.IOutputCollector
    public void emitDirect(int i, String str, Collection<Tuple> collection, List<Object> list) {
        admitBoltTuple(i, str, collection, list);
    }

    @Override // com.twitter.heron.api.bolt.IErrorReporter
    public void reportError(Throwable th) {
        LOG.log(Level.SEVERE, "Error stack trace ", (Throwable) new Exception("Reporting an error in topology code", th));
    }

    @Override // com.twitter.heron.api.bolt.IOutputCollector
    public void ack(Tuple tuple) {
        admitAckTuple(tuple);
    }

    @Override // com.twitter.heron.api.bolt.IOutputCollector
    public void fail(Tuple tuple) {
        admitFailTuple(tuple);
    }

    public boolean isOutQueuesAvailable() {
        return this.outputter.isOutQueuesAvailable();
    }

    public long getTotalDataEmittedInBytes() {
        return this.outputter.getTotalDataEmittedInBytes();
    }

    public void sendOutTuples() {
        this.outputter.sendOutTuples();
    }

    public void clear() {
        this.outputter.clear();
    }

    private List<Integer> admitBoltTuple(String str, Collection<Tuple> collection, List<Object> list) {
        this.helper.checkOutputSchema(str, list);
        List<Integer> chooseTasksForCustomStreamGrouping = this.helper.chooseTasksForCustomStreamGrouping(str, list);
        this.helper.getTopologyContext().invokeHookEmit(list, str, chooseTasksForCustomStreamGrouping);
        HeronTuples.HeronDataTuple.Builder newBuilder = HeronTuples.HeronDataTuple.newBuilder();
        newBuilder.setKey(0L);
        if (chooseTasksForCustomStreamGrouping != null) {
            Iterator<Integer> it = chooseTasksForCustomStreamGrouping.iterator();
            while (it.hasNext()) {
                newBuilder.addDestTaskIds(it.next().intValue());
            }
        }
        if (collection != null) {
            HashSet hashSet = new HashSet();
            for (Tuple tuple : collection) {
                if (tuple instanceof TupleImpl) {
                    hashSet.addAll(((TupleImpl) tuple).getRoots());
                }
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                newBuilder.addRoots((HeronTuples.RootId) it2.next());
            }
        }
        long j = 0;
        long nanoTime = System.nanoTime();
        Iterator<Object> it3 = list.iterator();
        while (it3.hasNext()) {
            newBuilder.addValues(ByteString.copyFrom(this.serializer.serialize(it3.next())));
            j += r0.length;
        }
        this.boltMetrics.serializeDataTuple(str, System.nanoTime() - nanoTime);
        this.outputter.addDataTuple(str, newBuilder, j);
        this.boltMetrics.emittedTuple(str);
        return null;
    }

    private void admitBoltTuple(int i, String str, Collection<Tuple> collection, List<Object> list) {
        throw new RuntimeException("emitDirect not supported");
    }

    private void admitAckTuple(Tuple tuple) {
        if (tuple instanceof TupleImpl) {
            TupleImpl tupleImpl = (TupleImpl) tuple;
            if (this.ackEnabled) {
                HeronTuples.AckTuple.Builder newBuilder = HeronTuples.AckTuple.newBuilder();
                newBuilder.setAckedtuple(tupleImpl.getTupleKey());
                long j = 0;
                Iterator<HeronTuples.RootId> it = tupleImpl.getRoots().iterator();
                while (it.hasNext()) {
                    newBuilder.addRoots(it.next());
                    j += r0.getSerializedSize();
                }
                this.outputter.addAckTuple(newBuilder, j);
            }
            long nanoTime = System.nanoTime() - tupleImpl.getCreationTime();
            this.helper.getTopologyContext().invokeHookBoltAck(tuple, nanoTime);
            this.boltMetrics.ackedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), nanoTime);
        }
    }

    private void admitFailTuple(Tuple tuple) {
        if (tuple instanceof TupleImpl) {
            TupleImpl tupleImpl = (TupleImpl) tuple;
            if (this.ackEnabled) {
                HeronTuples.AckTuple.Builder newBuilder = HeronTuples.AckTuple.newBuilder();
                newBuilder.setAckedtuple(tupleImpl.getTupleKey());
                long j = 0;
                Iterator<HeronTuples.RootId> it = tupleImpl.getRoots().iterator();
                while (it.hasNext()) {
                    newBuilder.addRoots(it.next());
                    j += r0.getSerializedSize();
                }
                this.outputter.addFailTuple(newBuilder, j);
            }
            long nanoTime = System.nanoTime() - tupleImpl.getCreationTime();
            this.helper.getTopologyContext().invokeHookBoltFail(tuple, nanoTime);
            this.boltMetrics.failedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), nanoTime);
        }
    }
}
