/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.standalone.routing;

import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.logging.impl.EventStatisticLogger;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.model.base.InvocableStreamPipesEntity;
import org.streampipes.model.grounding.TransportFormat;
import org.streampipes.model.grounding.TransportProtocol;
import org.streampipes.wrapper.routing.SpInputCollector;
import org.streampipes.wrapper.runtime.PipelineElement;
import org.streampipes.wrapper.standalone.manager.ProtocolManager;
import org.streampipes.wrapper.standalone.routing.StandaloneSpCollector;

public class StandaloneSpInputCollector<T extends TransportProtocol>
extends StandaloneSpCollector<T, PipelineElement<?>>
implements InternalEventProcessor<byte[]>,
SpInputCollector {
    private Boolean singletonEngine;

    public StandaloneSpInputCollector(T protocol, TransportFormat format, Boolean singletonEngine) throws SpRuntimeException {
        super(protocol, format);
        this.singletonEngine = singletonEngine;
    }

    public void onEvent(byte[] event) {
        if (this.singletonEngine.booleanValue()) {
            this.send((PipelineElement)this.consumers.get(this.consumers.keySet().toArray()[0]), event);
        } else {
            this.consumers.keySet().forEach(c -> this.send((PipelineElement)this.consumers.get(c), event));
        }
    }

    private void send(PipelineElement<?> processor, byte[] event) {
        try {
            processor.onEvent(this.dataFormatDefinition.toMap(event), this.getTopic());
            InvocableStreamPipesEntity graph = processor.getGraph();
            EventStatisticLogger.log((String)graph.getName(), (String)graph.getCorrespondingPipeline(), (String)graph.getUri());
        }
        catch (SpRuntimeException e) {
            e.printStackTrace();
        }
    }

    public void connect() throws SpRuntimeException {
        if (!this.protocolDefinition.getConsumer().isConnected().booleanValue()) {
            this.protocolDefinition.getConsumer().connect(this.transportProtocol, (InternalEventProcessor)this);
        }
    }

    public void disconnect() throws SpRuntimeException {
        if (this.protocolDefinition.getConsumer().isConnected().booleanValue() && this.consumers.size() == 0) {
            this.protocolDefinition.getConsumer().disconnect();
            ProtocolManager.removeInputCollector(this.transportProtocol);
        }
    }
}

