package com.twitter.heron.simulator.instance;

import com.google.protobuf.ByteString;
import com.twitter.heron.api.bolt.IBolt;
import com.twitter.heron.api.bolt.OutputCollector;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.metric.GlobalMetrics;
import com.twitter.heron.api.serializer.IPluggableSerializer;
import com.twitter.heron.api.utils.Utils;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.basics.SlaveLooper;
import com.twitter.heron.common.basics.TypeUtils;
import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.common.utils.metrics.BoltMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
import com.twitter.heron.common.utils.misc.SerializeDeSerializeHelper;
import com.twitter.heron.common.utils.topology.TopologyContextImpl;
import com.twitter.heron.common.utils.tuple.TickTuple;
import com.twitter.heron.common.utils.tuple.TupleImpl;
import com.twitter.heron.proto.system.HeronTuples;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.logging.Logger;

/* loaded from: input_file:com/twitter/heron/simulator/instance/BoltInstance.class */
public class BoltInstance implements IInstance {
    private static final Logger LOG = Logger.getLogger(BoltInstance.class.getName());
    private final PhysicalPlanHelper helper;
    private final IBolt bolt;
    private final BoltOutputCollectorImpl collector;
    private final IPluggableSerializer serializer;
    private final BoltMetrics boltMetrics = new BoltMetrics();
    private final Communicator<HeronTuples.HeronTupleSet> streamInQueue;
    private final SlaveLooper looper;
    private final SystemConfig systemConfig;

    public BoltInstance(PhysicalPlanHelper physicalPlanHelper, Communicator<HeronTuples.HeronTupleSet> communicator, Communicator<HeronTuples.HeronTupleSet> communicator2, SlaveLooper slaveLooper) {
        this.helper = physicalPlanHelper;
        this.looper = slaveLooper;
        this.streamInQueue = communicator;
        this.boltMetrics.initMultiCountMetrics(physicalPlanHelper);
        if (physicalPlanHelper.getMyBolt() == null) {
            throw new RuntimeException("HeronBoltInstance has no bolt in physical plan.");
        }
        this.serializer = SerializeDeSerializeHelper.getSerializer(physicalPlanHelper.getTopologyContext().getTopologyConfig());
        this.systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        if (physicalPlanHelper.getMyBolt().getComp().hasSerializedObject()) {
            this.bolt = (IBolt) Utils.deserialize(physicalPlanHelper.getMyBolt().getComp().getSerializedObject().toByteArray());
        } else {
            if (!physicalPlanHelper.getMyBolt().getComp().hasClassName()) {
                throw new RuntimeException("Neither java_object nor java_class_name set for bolt");
            }
            try {
                this.bolt = (IBolt) Class.forName(physicalPlanHelper.getMyBolt().getComp().getClassName()).newInstance();
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(e + " Bolt class must be in class path.");
            } catch (IllegalAccessException e2) {
                throw new RuntimeException(e2 + " Bolt class must have a no-arg constructor.");
            } catch (InstantiationException e3) {
                throw new RuntimeException(e3 + " Bolt class must be concrete.");
            }
        }
        this.collector = new BoltOutputCollectorImpl(this.serializer, physicalPlanHelper, communicator2, this.boltMetrics);
    }

    @Override // com.twitter.heron.simulator.instance.IInstance
    public void start() {
        TopologyContextImpl topologyContext = this.helper.getTopologyContext();
        GlobalMetrics.init(topologyContext, this.systemConfig.getHeronMetricsExportIntervalSec());
        this.boltMetrics.registerMetrics(topologyContext);
        this.bolt.prepare(topologyContext.getTopologyConfig(), topologyContext, new OutputCollector(this.collector));
        topologyContext.invokeHookPrepare();
        this.helper.prepareForCustomStreamGrouping(topologyContext);
        addBoltTasks();
    }

    @Override // com.twitter.heron.simulator.instance.IInstance
    public void stop() {
        this.helper.getTopologyContext().invokeHookCleanup();
        this.bolt.cleanup();
        this.looper.exitLoop();
        this.streamInQueue.clear();
        this.collector.clear();
    }

    private void addBoltTasks() {
        this.looper.addTasksOnWakeup(new Runnable() { // from class: com.twitter.heron.simulator.instance.BoltInstance.1
            @Override // java.lang.Runnable
            public void run() {
                if (BoltInstance.this.collector.isOutQueuesAvailable()) {
                    BoltInstance.this.readTuplesAndExecute(BoltInstance.this.streamInQueue);
                    BoltInstance.this.collector.sendOutTuples();
                } else {
                    BoltInstance.this.boltMetrics.updateOutQueueFullCount();
                }
                if (!BoltInstance.this.collector.isOutQueuesAvailable() || BoltInstance.this.streamInQueue.isEmpty()) {
                    return;
                }
                BoltInstance.this.looper.wakeUp();
            }
        });
        PrepareTickTupleTimer();
    }

    private void handleDataTuple(HeronTuples.HeronDataTuple heronDataTuple, TopologyContextImpl topologyContextImpl, TopologyAPI.StreamId streamId) {
        long nanoTime = System.nanoTime();
        ArrayList arrayList = new ArrayList();
        Iterator<ByteString> it = heronDataTuple.getValuesList().iterator();
        while (it.hasNext()) {
            arrayList.add(this.serializer.deserialize(it.next().toByteArray()));
        }
        TupleImpl tupleImpl = new TupleImpl(topologyContextImpl, streamId, heronDataTuple.getKey(), heronDataTuple.getRootsList(), arrayList);
        long nanoTime2 = System.nanoTime();
        this.bolt.execute(tupleImpl);
        long nanoTime3 = System.nanoTime() - nanoTime2;
        topologyContextImpl.invokeHookBoltExecute(tupleImpl, nanoTime3);
        this.boltMetrics.deserializeDataTuple(streamId.getId(), streamId.getComponentName(), nanoTime2 - nanoTime);
        this.boltMetrics.executeTuple(streamId.getId(), streamId.getComponentName(), nanoTime3);
    }

    @Override // com.twitter.heron.simulator.instance.IInstance
    public void readTuplesAndExecute(Communicator<HeronTuples.HeronTupleSet> communicator) {
        long nanoTime = System.nanoTime();
        long totalDataEmittedInBytes = this.collector.getTotalDataEmittedInBytes();
        long instanceExecuteBatchTimeMs = this.systemConfig.getInstanceExecuteBatchTimeMs() * 1000000;
        long instanceExecuteBatchSizeBytes = this.systemConfig.getInstanceExecuteBatchSizeBytes();
        while (!communicator.isEmpty()) {
            HeronTuples.HeronTupleSet poll = communicator.poll();
            TopologyContextImpl topologyContext = this.helper.getTopologyContext();
            if (poll.hasControl()) {
                throw new RuntimeException("Bolt cannot get acks/fails from other components");
            }
            TopologyAPI.StreamId stream = poll.getData().getStream();
            Iterator<HeronTuples.HeronDataTuple> it = poll.getData().getTuplesList().iterator();
            while (it.hasNext()) {
                handleDataTuple(it.next(), topologyContext, stream);
            }
            if ((System.nanoTime() - nanoTime) - instanceExecuteBatchTimeMs > 0 || this.collector.getTotalDataEmittedInBytes() - totalDataEmittedInBytes > instanceExecuteBatchSizeBytes) {
                return;
            }
        }
    }

    @Override // com.twitter.heron.simulator.instance.IInstance
    public void activate() {
    }

    @Override // com.twitter.heron.simulator.instance.IInstance
    public void deactivate() {
    }

    private void PrepareTickTupleTimer() {
        Object obj = this.helper.getTopologyContext().getTopologyConfig().get("topology.tick.tuple.freq.secs");
        if (obj != null) {
            this.looper.registerTimerEventInSeconds(TypeUtils.getInteger(obj).intValue(), new Runnable() { // from class: com.twitter.heron.simulator.instance.BoltInstance.2
                @Override // java.lang.Runnable
                public void run() {
                    BoltInstance.this.SendTickTuple();
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void SendTickTuple() {
        TickTuple tickTuple = new TickTuple();
        long nanoTime = System.nanoTime();
        this.bolt.execute(tickTuple);
        this.boltMetrics.executeTuple(tickTuple.getSourceStreamId(), tickTuple.getSourceComponent(), System.nanoTime() - nanoTime);
        this.collector.sendOutTuples();
        PrepareTickTupleTimer();
    }
}
