package com.twitter.heron.simulator.instance;

import com.twitter.heron.api.Config;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.metric.GlobalMetrics;
import com.twitter.heron.api.serializer.IPluggableSerializer;
import com.twitter.heron.api.spout.ISpout;
import com.twitter.heron.api.spout.SpoutOutputCollector;
import com.twitter.heron.api.utils.Utils;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.basics.Constants;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.basics.SlaveLooper;
import com.twitter.heron.common.basics.TypeUtils;
import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.common.utils.metrics.SpoutMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
import com.twitter.heron.common.utils.misc.SerializeDeSerializeHelper;
import com.twitter.heron.common.utils.topology.TopologyContextImpl;
import com.twitter.heron.proto.system.HeronTuples;
import java.util.Iterator;
import java.util.Map;
import java.util.logging.Logger;

/* loaded from: input_file:com/twitter/heron/simulator/instance/SpoutInstance.class */
public class SpoutInstance implements IInstance {
    private static final Logger LOG = Logger.getLogger(SpoutInstance.class.getName());
    private final ISpout spout;
    private final SpoutOutputCollectorImpl collector;
    private final IPluggableSerializer serializer;
    private final SpoutMetrics spoutMetrics = new SpoutMetrics();
    private final Communicator<HeronTuples.HeronTupleSet> streamInQueue;
    private final boolean ackEnabled;
    private final boolean enableMessageTimeouts;
    private final SlaveLooper looper;
    private final SystemConfig systemConfig;
    private final Map<String, Object> config;
    private PhysicalPlanHelper helper;
    private TopologyAPI.TopologyState topologyState;

    public SpoutInstance(PhysicalPlanHelper physicalPlanHelper, Communicator<HeronTuples.HeronTupleSet> communicator, Communicator<HeronTuples.HeronTupleSet> communicator2, SlaveLooper slaveLooper) {
        this.helper = physicalPlanHelper;
        this.looper = slaveLooper;
        this.streamInQueue = communicator;
        this.spoutMetrics.initMultiCountMetrics(physicalPlanHelper);
        this.config = physicalPlanHelper.getTopologyContext().getTopologyConfig();
        this.systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.ackEnabled = Boolean.parseBoolean((String) this.config.get(Config.TOPOLOGY_ENABLE_ACKING));
        this.enableMessageTimeouts = Boolean.parseBoolean((String) this.config.get("topology.enable.message.timeouts"));
        LOG.info("Enable Ack: " + this.ackEnabled);
        LOG.info("EnableMessageTimeouts: " + this.enableMessageTimeouts);
        if (physicalPlanHelper.getMySpout() == null) {
            throw new RuntimeException("HeronSpoutInstance has no spout in physical plan");
        }
        this.serializer = SerializeDeSerializeHelper.getSerializer(this.config);
        if (physicalPlanHelper.getMySpout().getComp().hasSerializedObject()) {
            this.spout = (ISpout) Utils.deserialize(physicalPlanHelper.getMySpout().getComp().getSerializedObject().toByteArray());
        } else {
            if (!physicalPlanHelper.getMySpout().getComp().hasClassName()) {
                throw new RuntimeException("Neither java_object nor java_class_name set for spout");
            }
            try {
                this.spout = (ISpout) Class.forName(physicalPlanHelper.getMySpout().getComp().getClassName()).newInstance();
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(e + " Spout class must be in class path.");
            } catch (IllegalAccessException e2) {
                throw new RuntimeException(e2 + " Spout class must have a no-arg constructor.");
            } catch (InstantiationException e3) {
                throw new RuntimeException(e3 + " Spout class must be concrete.");
            }
        }
        this.collector = new SpoutOutputCollectorImpl(this.serializer, physicalPlanHelper, communicator2, this.spoutMetrics);
    }

    @Override // com.twitter.heron.simulator.instance.IInstance
    public void start() {
        TopologyContextImpl topologyContext = this.helper.getTopologyContext();
        GlobalMetrics.init(topologyContext, this.systemConfig.getHeronMetricsExportIntervalSec());
        this.spoutMetrics.registerMetrics(topologyContext);
        this.spout.open(topologyContext.getTopologyConfig(), topologyContext, new SpoutOutputCollector(this.collector));
        topologyContext.invokeHookPrepare();
        this.helper.prepareForCustomStreamGrouping(topologyContext);
        addSpoutsTasks();
        this.topologyState = TopologyAPI.TopologyState.RUNNING;
    }

    @Override // com.twitter.heron.simulator.instance.IInstance
    public void stop() {
        this.helper.getTopologyContext().invokeHookCleanup();
        this.spout.close();
        this.looper.exitLoop();
        this.streamInQueue.clear();
        this.collector.clear();
    }

    @Override // com.twitter.heron.simulator.instance.IInstance
    public void activate() {
        LOG.info("Spout is activated");
        this.spout.activate();
        this.topologyState = TopologyAPI.TopologyState.RUNNING;
    }

    @Override // com.twitter.heron.simulator.instance.IInstance
    public void deactivate() {
        LOG.info("Spout is deactivated");
        this.spout.deactivate();
        this.topologyState = TopologyAPI.TopologyState.PAUSED;
    }

    private void addSpoutsTasks() {
        this.looper.addTasksOnWakeup(new Runnable() { // from class: com.twitter.heron.simulator.instance.SpoutInstance.1
            @Override // java.lang.Runnable
            public void run() {
                if (SpoutInstance.this.isProduceTuple()) {
                    SpoutInstance.this.produceTuple();
                    SpoutInstance.this.collector.sendOutTuples();
                }
                if (!SpoutInstance.this.collector.isOutQueuesAvailable()) {
                    SpoutInstance.this.spoutMetrics.updateOutQueueFullCount();
                }
                if (SpoutInstance.this.ackEnabled) {
                    SpoutInstance.this.readTuplesAndExecute(SpoutInstance.this.streamInQueue);
                    SpoutInstance.this.spoutMetrics.updatePendingTuplesCount(SpoutInstance.this.collector.numInFlight());
                } else {
                    SpoutInstance.this.doImmediateAcks();
                }
                if (SpoutInstance.this.isContinueWork()) {
                    SpoutInstance.this.looper.wakeUp();
                }
            }
        });
        if (this.enableMessageTimeouts) {
            lookForTimeouts();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isContinueWork() {
        return this.topologyState.equals(TopologyAPI.TopologyState.RUNNING) && ((!this.ackEnabled && this.collector.isOutQueuesAvailable()) || ((this.ackEnabled && this.collector.isOutQueuesAvailable() && ((long) this.collector.numInFlight()) < TypeUtils.getLong(this.config.get("topology.max.spout.pending")).longValue()) || (this.ackEnabled && !this.streamInQueue.isEmpty())));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isProduceTuple() {
        return this.collector.isOutQueuesAvailable() && this.topologyState.equals(TopologyAPI.TopologyState.RUNNING);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void produceTuple() {
        int intValue = TypeUtils.getInteger(this.config.get("topology.max.spout.pending")).intValue();
        long totalTuplesEmitted = this.collector.getTotalTuplesEmitted();
        long totalDataEmittedInBytes = this.collector.getTotalDataEmittedInBytes();
        long instanceEmitBatchTimeMs = this.systemConfig.getInstanceEmitBatchTimeMs() * 1000000;
        long instanceEmitBatchSizeBytes = this.systemConfig.getInstanceEmitBatchSizeBytes();
        long nanoTime = System.nanoTime();
        do {
            if ((!this.ackEnabled || intValue <= this.collector.numInFlight()) && this.ackEnabled) {
                return;
            }
            long nanoTime2 = System.nanoTime();
            this.spout.nextTuple();
            this.spoutMetrics.nextTuple(System.nanoTime() - nanoTime2);
            long totalTuplesEmitted2 = this.collector.getTotalTuplesEmitted();
            if (totalTuplesEmitted2 == totalTuplesEmitted) {
                return;
            }
            totalTuplesEmitted = totalTuplesEmitted2;
            if ((System.nanoTime() - nanoTime) - instanceEmitBatchTimeMs > 0) {
                return;
            }
        } while (this.collector.getTotalDataEmittedInBytes() - totalDataEmittedInBytes <= instanceEmitBatchSizeBytes);
    }

    private void handleAckTuple(HeronTuples.AckTuple ackTuple, boolean z) {
        for (HeronTuples.RootId rootId : ackTuple.getRootsList()) {
            if (rootId.getTaskid() != this.helper.getMyTaskId()) {
                throw new RuntimeException(String.format("Receiving tuple for task %d in task %d", Integer.valueOf(rootId.getTaskid()), Integer.valueOf(this.helper.getMyTaskId())));
            }
            RootTupleInfo retireInFlight = this.collector.retireInFlight(rootId.getKey());
            if (retireInFlight == null) {
                return;
            }
            Object messageId = retireInFlight.getMessageId();
            if (messageId != null) {
                long nanoTime = System.nanoTime() - retireInFlight.getInsertionTime();
                if (z) {
                    invokeAck(messageId, retireInFlight.getStreamId(), Long.valueOf(nanoTime));
                } else {
                    invokeFail(messageId, retireInFlight.getStreamId(), Long.valueOf(nanoTime));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void lookForTimeouts() {
        long longValue = TypeUtils.getLong(this.config.get("topology.message.timeout.secs")).longValue() * Constants.SECONDS_TO_NANOSECONDS;
        int instanceAcknowledgementNbuckets = this.systemConfig.getInstanceAcknowledgementNbuckets();
        for (RootTupleInfo rootTupleInfo : this.collector.retireExpired(longValue)) {
            this.spoutMetrics.timeoutTuple(rootTupleInfo.getStreamId());
            invokeFail(rootTupleInfo.getMessageId(), rootTupleInfo.getStreamId(), Long.valueOf(longValue));
        }
        this.looper.registerTimerEventInNanoSeconds(longValue / instanceAcknowledgementNbuckets, new Runnable() { // from class: com.twitter.heron.simulator.instance.SpoutInstance.2
            @Override // java.lang.Runnable
            public void run() {
                SpoutInstance.this.lookForTimeouts();
            }
        });
    }

    @Override // com.twitter.heron.simulator.instance.IInstance
    public void readTuplesAndExecute(Communicator<HeronTuples.HeronTupleSet> communicator) {
        long nanoTime = System.nanoTime();
        long instanceAckBatchTimeMs = this.systemConfig.getInstanceAckBatchTimeMs() * 1000000;
        while (!communicator.isEmpty()) {
            HeronTuples.HeronTupleSet poll = communicator.poll();
            if (poll.hasData()) {
                throw new RuntimeException("Spout cannot get incoming data tuples from other components");
            }
            if (poll.hasControl()) {
                Iterator<HeronTuples.AckTuple> it = poll.getControl().getAcksList().iterator();
                while (it.hasNext()) {
                    handleAckTuple(it.next(), true);
                }
                Iterator<HeronTuples.AckTuple> it2 = poll.getControl().getFailsList().iterator();
                while (it2.hasNext()) {
                    handleAckTuple(it2.next(), false);
                }
            }
            if ((System.nanoTime() - nanoTime) - instanceAckBatchTimeMs > 0) {
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doImmediateAcks() {
        int size = this.collector.getImmediateAcks().size();
        for (int i = 0; i < size; i++) {
            RootTupleInfo poll = this.collector.getImmediateAcks().poll();
            invokeAck(poll.getMessageId(), poll.getStreamId(), 0L);
        }
    }

    private void invokeAck(Object obj, String str, Long l) {
        this.spout.ack(obj);
        this.helper.getTopologyContext().invokeHookSpoutAck(obj, l.longValue());
        this.spoutMetrics.ackedTuple(str, l.longValue());
    }

    private void invokeFail(Object obj, String str, Long l) {
        this.spout.fail(obj);
        this.helper.getTopologyContext().invokeHookSpoutFail(obj, l.longValue());
        this.spoutMetrics.failedTuple(str, l.longValue());
    }
}
