/*
 * Decompiled with CFR 0.152.
 */
package ru.kiryam.storm.rabbitmq;

import backtype.storm.spout.Scheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.kiryam.storm.rabbitmq.Declarator;
import ru.kiryam.storm.rabbitmq.ErrorReporter;
import ru.kiryam.storm.rabbitmq.Message;
import ru.kiryam.storm.rabbitmq.MessageScheme;
import ru.kiryam.storm.rabbitmq.RabbitMQConsumer;
import ru.kiryam.storm.rabbitmq.config.ConsumerConfig;

public class RabbitMQSpout
extends BaseRichSpout {
    private final MessageScheme scheme;
    private final Declarator declarator;
    private transient Logger logger;
    private transient RabbitMQConsumer consumer;
    private transient SpoutOutputCollector collector;
    private transient int prefetchCount;
    private boolean active;
    private String streamId;

    public RabbitMQSpout(Scheme scheme) {
        this(MessageScheme.Builder.from(scheme), (Declarator)new Declarator.NoOp(), (String)null);
    }

    public RabbitMQSpout(Scheme scheme, String streamId) {
        this(MessageScheme.Builder.from(scheme), (Declarator)new Declarator.NoOp(), streamId);
    }

    public RabbitMQSpout(Scheme scheme, Declarator declarator) {
        this(MessageScheme.Builder.from(scheme), declarator, (String)null);
    }

    public RabbitMQSpout(MessageScheme scheme, Declarator declarator) {
        this(scheme, declarator, (String)null);
    }

    public RabbitMQSpout(Scheme scheme, Declarator declarator, String streamId) {
        this(MessageScheme.Builder.from(scheme), declarator, streamId);
    }

    public RabbitMQSpout(MessageScheme scheme, Declarator declarator, String streamId) {
        this.scheme = scheme;
        this.declarator = declarator;
        this.streamId = streamId;
    }

    public void open(Map config, TopologyContext context, final SpoutOutputCollector spoutOutputCollector) {
        ConsumerConfig consumerConfig = ConsumerConfig.getFromStormConfig(config);
        ErrorReporter reporter = new ErrorReporter(){

            @Override
            public void reportError(Throwable error) {
                spoutOutputCollector.reportError(error);
            }
        };
        this.consumer = this.loadConsumer(this.declarator, reporter, consumerConfig);
        this.scheme.open(config, context);
        this.consumer.open();
        this.prefetchCount = consumerConfig.getPrefetchCount();
        this.logger = LoggerFactory.getLogger(RabbitMQSpout.class);
        this.collector = spoutOutputCollector;
        this.active = true;
    }

    protected RabbitMQConsumer loadConsumer(Declarator declarator, ErrorReporter reporter, ConsumerConfig config) {
        return new RabbitMQConsumer(config.getConnectionConfig(), config.getPrefetchCount(), config.getQueueName(), config.isRequeueOnFail(), declarator, reporter);
    }

    public void close() {
        this.consumer.close();
        this.scheme.close();
        super.close();
    }

    public void nextTuple() {
        Message message;
        if (!this.active) {
            return;
        }
        int emitted = 0;
        while (emitted < this.prefetchCount && (message = this.consumer.nextMessage()) != Message.NONE) {
            List<Object> tuple = this.extractTuple(message);
            if (tuple.isEmpty()) continue;
            this.emit(tuple, message, this.collector);
            ++emitted;
        }
    }

    protected List<Integer> emit(List<Object> tuple, Message message, SpoutOutputCollector spoutOutputCollector) {
        return this.streamId == null ? spoutOutputCollector.emit(tuple, (Object)this.getDeliveryTag(message)) : spoutOutputCollector.emit(this.streamId, tuple, (Object)this.getDeliveryTag(message));
    }

    private List<Object> extractTuple(Message message) {
        long deliveryTag = this.getDeliveryTag(message);
        try {
            List<Object> tuple = this.scheme.deserialize(message);
            if (tuple != null && !tuple.isEmpty()) {
                return tuple;
            }
            String errorMsg = "Deserialization error for msgId " + deliveryTag;
            this.logger.warn(errorMsg);
            this.collector.reportError((Throwable)new Exception(errorMsg));
        }
        catch (Exception e) {
            this.logger.warn("Deserialization error for msgId " + deliveryTag, (Throwable)e);
            this.collector.reportError((Throwable)e);
        }
        this.consumer.deadLetter(deliveryTag);
        return Collections.emptyList();
    }

    public void ack(Object msgId) {
        if (msgId instanceof Long) {
            this.consumer.ack((Long)msgId);
        }
    }

    public void fail(Object msgId) {
        if (msgId instanceof Long) {
            this.consumer.fail((Long)msgId);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        if (this.streamId == null) {
            outputFieldsDeclarer.declare(this.scheme.getOutputFields());
        } else {
            outputFieldsDeclarer.declareStream(this.streamId, this.scheme.getOutputFields());
        }
    }

    public void deactivate() {
        super.deactivate();
        this.active = false;
    }

    public void activate() {
        super.activate();
        this.active = true;
    }

    protected long getDeliveryTag(Message message) {
        return ((Message.DeliveredMessage)message).getDeliveryTag();
    }
}

