/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.standalone.routing;

import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.model.grounding.TransportFormat;
import org.streampipes.model.grounding.TransportProtocol;
import org.streampipes.wrapper.routing.RawDataProcessor;
import org.streampipes.wrapper.routing.SpInputCollector;
import org.streampipes.wrapper.standalone.manager.ProtocolManager;
import org.streampipes.wrapper.standalone.routing.StandaloneSpCollector;

public class StandaloneSpInputCollector<T extends TransportProtocol>
extends StandaloneSpCollector<T, RawDataProcessor>
implements InternalEventProcessor<byte[]>,
SpInputCollector {
    private Boolean singletonEngine;

    public StandaloneSpInputCollector(T protocol, TransportFormat format, Boolean singletonEngine) throws SpRuntimeException {
        super(protocol, format);
        this.singletonEngine = singletonEngine;
    }

    public void onEvent(byte[] event) {
        if (this.singletonEngine.booleanValue()) {
            this.send((RawDataProcessor)this.consumers.get(this.consumers.keySet().toArray()[0]), event);
        } else {
            this.consumers.forEach((key, value) -> this.send((RawDataProcessor)value, event));
        }
    }

    private void send(RawDataProcessor rawDataProcessor, byte[] event) {
        try {
            rawDataProcessor.process(this.dataFormatDefinition.toMap(event), this.getTopic());
        }
        catch (SpRuntimeException e) {
            e.printStackTrace();
        }
    }

    public void connect() throws SpRuntimeException {
        if (!this.protocolDefinition.getConsumer().isConnected().booleanValue()) {
            this.protocolDefinition.getConsumer().connect(this.transportProtocol, (InternalEventProcessor)this);
        }
    }

    public void disconnect() throws SpRuntimeException {
        if (this.protocolDefinition.getConsumer().isConnected().booleanValue() && this.consumers.size() == 0) {
            this.protocolDefinition.getConsumer().disconnect();
            ProtocolManager.removeInputCollector(this.transportProtocol);
        }
    }
}

