package com.twitter.heron.simulator.instance;

import com.google.protobuf.ByteString;
import com.twitter.heron.api.Config;
import com.twitter.heron.api.serializer.IPluggableSerializer;
import com.twitter.heron.api.spout.ISpoutOutputCollector;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.utils.metrics.SpoutMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
import com.twitter.heron.common.utils.misc.TupleKeyGenerator;
import com.twitter.heron.proto.system.HeronTuples;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/twitter/heron/simulator/instance/SpoutOutputCollectorImpl.class */
public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
    private static final Logger LOG = Logger.getLogger(SpoutOutputCollectorImpl.class.getName());
    private final LinkedHashMap<Long, RootTupleInfo> inFlightTuples;
    private final TupleKeyGenerator keyGenerator;
    private final SpoutMetrics spoutMetrics;
    private final PhysicalPlanHelper helper;
    private final boolean ackingEnabled;
    private final Queue<RootTupleInfo> immediateAcks;
    private final IPluggableSerializer serializer;
    private final OutgoingTupleCollection outputter;
    private long totalTuplesEmitted;

    public SpoutOutputCollectorImpl(IPluggableSerializer iPluggableSerializer, PhysicalPlanHelper physicalPlanHelper, Communicator<HeronTuples.HeronTupleSet> communicator, SpoutMetrics spoutMetrics) {
        if (physicalPlanHelper.getMySpout() == null) {
            throw new RuntimeException(physicalPlanHelper.getMyTaskId() + " is not a spout ");
        }
        this.serializer = iPluggableSerializer;
        this.helper = physicalPlanHelper;
        this.spoutMetrics = spoutMetrics;
        this.keyGenerator = new TupleKeyGenerator();
        this.inFlightTuples = new LinkedHashMap<>();
        Map<String, Object> topologyConfig = physicalPlanHelper.getTopologyContext().getTopologyConfig();
        if (!topologyConfig.containsKey(Config.TOPOLOGY_ENABLE_ACKING) || topologyConfig.get(Config.TOPOLOGY_ENABLE_ACKING) == null) {
            this.ackingEnabled = false;
        } else {
            this.ackingEnabled = Boolean.parseBoolean(topologyConfig.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
        }
        if (this.ackingEnabled) {
            this.immediateAcks = null;
        } else {
            this.immediateAcks = new ArrayDeque();
        }
        this.outputter = new OutgoingTupleCollection(physicalPlanHelper, communicator);
    }

    @Override // com.twitter.heron.api.spout.ISpoutOutputCollector
    public List<Integer> emit(String str, List<Object> list, Object obj) {
        return admitSpoutTuple(str, list, obj);
    }

    @Override // com.twitter.heron.api.spout.ISpoutOutputCollector
    public void emitDirect(int i, String str, List<Object> list, Object obj) {
        admitSpoutTuple(i, str, list, obj);
    }

    @Override // com.twitter.heron.api.spout.ISpoutOutputCollector
    public void reportError(Throwable th) {
        LOG.log(Level.SEVERE, "Error stack trace ", (Throwable) new Exception("Reporting an error in topology code", th));
    }

    public boolean isOutQueuesAvailable() {
        return this.outputter.isOutQueuesAvailable();
    }

    public long getTotalDataEmittedInBytes() {
        return this.outputter.getTotalDataEmittedInBytes();
    }

    public void sendOutTuples() {
        this.outputter.sendOutTuples();
    }

    public long getTotalTuplesEmitted() {
        return this.totalTuplesEmitted;
    }

    public int numInFlight() {
        return this.inFlightTuples.size();
    }

    public Queue<RootTupleInfo> getImmediateAcks() {
        return this.immediateAcks;
    }

    public RootTupleInfo retireInFlight(long j) {
        return this.inFlightTuples.remove(Long.valueOf(j));
    }

    public List<RootTupleInfo> retireExpired(long j) {
        ArrayList arrayList = new ArrayList();
        long nanoTime = System.nanoTime();
        Iterator<RootTupleInfo> it = this.inFlightTuples.values().iterator();
        while (it.hasNext()) {
            RootTupleInfo next = it.next();
            if (!next.isExpired(nanoTime, j)) {
                break;
            }
            arrayList.add(next);
            it.remove();
        }
        return arrayList;
    }

    public void clear() {
        this.outputter.clear();
    }

    private List<Integer> admitSpoutTuple(String str, List<Object> list, Object obj) {
        this.helper.checkOutputSchema(str, list);
        List<Integer> chooseTasksForCustomStreamGrouping = this.helper.chooseTasksForCustomStreamGrouping(str, list);
        this.helper.getTopologyContext().invokeHookEmit(list, str, chooseTasksForCustomStreamGrouping);
        HeronTuples.HeronDataTuple.Builder newBuilder = HeronTuples.HeronDataTuple.newBuilder();
        newBuilder.setKey(0L);
        if (chooseTasksForCustomStreamGrouping != null) {
            Iterator<Integer> it = chooseTasksForCustomStreamGrouping.iterator();
            while (it.hasNext()) {
                newBuilder.addDestTaskIds(it.next().intValue());
            }
        }
        if (obj != null) {
            RootTupleInfo rootTupleInfo = new RootTupleInfo(str, obj);
            if (this.ackingEnabled) {
                newBuilder.addRoots(EstablishRootId(rootTupleInfo));
            } else {
                this.immediateAcks.offer(rootTupleInfo);
            }
        }
        long j = 0;
        long nanoTime = System.nanoTime();
        Iterator<Object> it2 = list.iterator();
        while (it2.hasNext()) {
            newBuilder.addValues(ByteString.copyFrom(this.serializer.serialize(it2.next())));
            j += r0.length;
        }
        this.spoutMetrics.serializeDataTuple(str, System.nanoTime() - nanoTime);
        this.outputter.addDataTuple(str, newBuilder, j);
        this.totalTuplesEmitted++;
        this.spoutMetrics.emittedTuple(str);
        return null;
    }

    private void admitSpoutTuple(int i, String str, List<Object> list, Object obj) {
        throw new RuntimeException("emitDirect Not implemented");
    }

    private HeronTuples.RootId.Builder EstablishRootId(RootTupleInfo rootTupleInfo) {
        long next = this.keyGenerator.next();
        HeronTuples.RootId.Builder newBuilder = HeronTuples.RootId.newBuilder();
        newBuilder.setTaskid(this.helper.getMyTaskId());
        newBuilder.setKey(next);
        this.inFlightTuples.put(Long.valueOf(next), rootTupleInfo);
        return newBuilder;
    }
}
