package com.github.diceproject.qt.spout;

import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import org.bson.Document;
import org.bson.conversions.Bson;

/* loaded from: input_file:com/github/diceproject/qt/spout/MongoDbSpout.class */
public class MongoDbSpout extends BaseRichSpout {
    private static final long serialVersionUID = -7261151120193254079L;
    private String _mongoHost;
    private String _mongoDbName;
    private String _mongoCollectionName;
    private SpoutOutputCollector _collector;
    private Bson _query;
    private LinkedBlockingQueue<Document> _queue;
    private MongoCollection<Document> _collection;
    private MongoClient _mongo = null;
    private MongoDatabase _database = null;
    private CursorThread _listener = null;

    /* loaded from: input_file:com/github/diceproject/qt/spout/MongoDbSpout$CursorThread.class */
    class CursorThread extends Thread {
        LinkedBlockingQueue<Document> queue;
        String mongoCollectionName;
        MongoDatabase mongoDB;
        Bson query;

        public CursorThread(LinkedBlockingQueue<Document> linkedBlockingQueue, MongoDatabase mongoDatabase, String str, Bson bson) {
            this.queue = linkedBlockingQueue;
            this.mongoDB = mongoDatabase;
            this.mongoCollectionName = str;
            this.query = bson;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                MongoCursor<Document> it = this.mongoDB.getCollection(this.mongoCollectionName).find(this.query).sort(new BasicDBObject(DBCollection.ID_FIELD_NAME, -1)).limit(100).iterator();
                while (it.hasNext()) {
                    Document next = it.next();
                    if (next != null) {
                        try {
                            this.queue.put(next);
                        } catch (InterruptedException e) {
                            Utils.sleep(100L);
                        }
                    }
                }
                System.out.println("Injector: " + MongoDbSpout.this._queue.size());
                Utils.sleep(10000L);
            }
        }
    }

    public MongoDbSpout(String str, String str2, String str3, Bson bson) {
        this._mongoHost = str;
        this._mongoDbName = str2;
        this._mongoCollectionName = str3;
        this._query = bson;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        try {
            reset(this._mongoHost, this._mongoDbName, this._mongoCollectionName);
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        this._collector = spoutOutputCollector;
        this._queue = new LinkedBlockingQueue<>(10000);
        this._mongo = new MongoClient(this._mongoHost);
        this._database = this._mongo.getDatabase(this._mongoDbName);
        this._collection = this._database.getCollection(this._mongoCollectionName);
        this._listener = new CursorThread(this._queue, this._database, this._mongoCollectionName, this._query);
        this._listener.start();
    }

    public void nextTuple() {
        Document poll = this._queue.poll();
        if (poll == null) {
            Utils.sleep(100L);
            return;
        }
        synchronized (this._collector) {
            this._collector.emit(Utils.tuple(new Object[]{poll.toString()}));
        }
        this._collection.updateOne(new BasicDBObject(DBCollection.ID_FIELD_NAME, poll.get(DBCollection.ID_FIELD_NAME)), new BasicDBObject("$set", new BasicDBObject("status", "injected")));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{this._mongoCollectionName}));
    }

    public void ack(Object obj) {
    }

    public void fail(Object obj) {
    }

    public void reset(String str, String str2, String str3) throws UnknownHostException {
        MongoClient mongoClient = new MongoClient(str);
        mongoClient.getDatabase(str2).getCollection(str3).updateMany(new BasicDBObject("status", "injected"), new BasicDBObject("$set", new BasicDBObject("status", "new")));
        mongoClient.close();
    }
}
