/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.flink.consumer;

import java.io.Serializable;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.dataformat.SpDataFormatDefinition;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.messaging.jms.ActiveMQConsumer;
import org.streampipes.model.grounding.JmsTransportProtocol;

public class JmsConsumer
implements SourceFunction<Map<String, Object>>,
Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(JmsConsumer.class);
    private JmsTransportProtocol protocol;
    private ActiveMQConsumer activeMQConsumer;
    private SpDataFormatDefinition spDataFormatDefinition;
    private Boolean isRunning;
    private Queue<byte[]> queue;

    public JmsConsumer(JmsTransportProtocol protocol, SpDataFormatDefinition spDataFormatDefinition) {
        this.protocol = protocol;
        this.activeMQConsumer = new ActiveMQConsumer();
        this.spDataFormatDefinition = spDataFormatDefinition;
        this.queue = new LinkedBlockingQueue<byte[]>();
    }

    public void run(SourceFunction.SourceContext<Map<String, Object>> sourceContext) throws Exception {
        this.isRunning = true;
        this.activeMQConsumer.connect(this.protocol, (InternalEventProcessor)new InternalEventProcessor<byte[]>(){

            public void onEvent(byte[] event) {
                JmsConsumer.this.queue.add(event);
            }
        });
        while (this.isRunning.booleanValue()) {
            if (!this.queue.isEmpty()) {
                sourceContext.collect((Object)this.spDataFormatDefinition.toMap(this.queue.poll()));
                continue;
            }
            Thread.sleep(100L);
        }
    }

    public void cancel() {
        try {
            this.activeMQConsumer.disconnect();
            this.isRunning = false;
        }
        catch (SpRuntimeException e) {
            LOG.error(e.getMessage());
        }
    }
}

