/*
 * Decompiled with CFR 0.152.
 */
package com.googlecode.fascinator.indexer;

import com.googlecode.fascinator.common.FascinatorHome;
import com.googlecode.fascinator.common.JsonSimple;
import com.googlecode.fascinator.common.JsonSimpleConfig;
import com.googlecode.fascinator.common.messaging.GenericListener;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.httpclient.Credentials;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.UsernamePasswordCredentials;
import org.apache.commons.httpclient.auth.AuthScope;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.request.DirectXmlRequest;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.xml.sax.SAXException;

public class SolrWrapperQueueConsumer
implements GenericListener {
    private static final String DEFAULT_SOLR_HOME = FascinatorHome.getPath((String)"solr");
    private static Integer BUFFER_LIMIT_DOCS = 200;
    private static Integer BUFFER_LIMIT_SIZE = 204800;
    private static Integer BUFFER_LIMIT_TIME = 30;
    public static final String QUEUE_ID = "solrwrapper";
    private Logger log = LoggerFactory.getLogger(SolrWrapperQueueConsumer.class);
    private JsonSimpleConfig globalConfig;
    private Connection connection;
    private Session session;
    private MessageConsumer consumer;
    private String name;
    private Thread thread = new Thread((Runnable)((Object)this), "solrwrapper");
    private SolrServer solr;
    private SolrServer commit;
    private CoreContainer coreContainer;
    private boolean autoCommit;
    private String username;
    private String password;
    private Map<String, String> docBuffer;
    private long bufferOldest;
    private long bufferYoungest;
    private int bufferSize;
    private int bufferDocLimit;
    private int bufferSizeLimit;
    private int bufferTimeLimit;
    private Timer timer;
    private String timerMDC;

    public void run() {
        try {
            MDC.put((String)"name", (String)this.name);
            this.log.info("Starting {}...", (Object)this.name);
            String brokerUrl = this.globalConfig.getString("tcp://localhost:61616", new Object[]{"messaging", "url"});
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
            this.connection = connectionFactory.createConnection();
            this.session = this.connection.createSession(false, 1);
            this.consumer = this.session.createConsumer((Destination)this.session.createQueue(QUEUE_ID));
            this.consumer.setMessageListener((MessageListener)this);
            this.connection.start();
            this.solr = this.initCore("solr");
            this.timer = new Timer("SolrWrapper:" + this.toString(), true);
            this.timer.scheduleAtFixedRate(new TimerTask(){

                @Override
                public void run() {
                    SolrWrapperQueueConsumer.this.checkTimeout();
                }
            }, 0L, 10000L);
        }
        catch (JMSException ex) {
            this.log.error("Error starting message thread!", (Throwable)ex);
        }
    }

    public void init(JsonSimpleConfig config) throws Exception {
        this.name = config.getString(null, new Object[]{"config", "name"});
        if (this.name == null) {
            throw new Exception("Name name provided in queue configuration");
        }
        this.thread.setName(this.name);
        try {
            this.globalConfig = new JsonSimpleConfig();
            this.autoCommit = this.globalConfig.getBoolean(Boolean.valueOf(true), new Object[]{"indexer", "solr", "autocommit"});
            this.docBuffer = new LinkedHashMap<String, String>();
            this.bufferSize = 0;
            this.bufferOldest = 0L;
            this.bufferDocLimit = this.globalConfig.getInteger(BUFFER_LIMIT_DOCS, new Object[]{"indexer", "buffer", "docLimit"});
            this.bufferSizeLimit = this.globalConfig.getInteger(BUFFER_LIMIT_SIZE, new Object[]{"indexer", "buffer", "sizeLimit"});
            this.bufferTimeLimit = this.globalConfig.getInteger(BUFFER_LIMIT_TIME, new Object[]{"indexer", "buffer", "timeLimit"});
        }
        catch (IOException ioe) {
            this.log.error("Failed to read configuration: {}", (Object)ioe.getMessage());
            throw ioe;
        }
    }

    private SolrServer initCore(String core) {
        boolean isEmbedded = this.globalConfig.getBoolean(Boolean.valueOf(false), new Object[]{"indexer", core, "embedded"});
        try {
            if (isEmbedded) {
                String coreName;
                String uri = this.globalConfig.getString(null, new Object[]{"indexer", core, "uri"});
                if (uri == null) {
                    this.log.error("No URI provided for core: '{}'", (Object)core);
                    return null;
                }
                URI solrUri = new URI(uri);
                this.commit = new CommonsHttpSolrServer(solrUri.toURL());
                this.username = this.globalConfig.getString(null, new Object[]{"indexer", core, "username"});
                this.password = this.globalConfig.getString(null, new Object[]{"indexer", core, "password"});
                if (this.username != null && this.password != null) {
                    UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(this.username, this.password);
                    HttpClient hc = ((CommonsHttpSolrServer)this.solr).getHttpClient();
                    hc.getParams().setAuthenticationPreemptive(true);
                    hc.getState().setCredentials(AuthScope.ANY, (Credentials)credentials);
                }
                if (this.coreContainer == null) {
                    String home = this.globalConfig.getString(DEFAULT_SOLR_HOME, new Object[]{"indexer", "home"});
                    this.log.info("Embedded Solr Home = {}", (Object)home);
                    File homeDir = new File(home);
                    if (!homeDir.exists()) {
                        this.log.error("Solr directory does not exist!");
                        return null;
                    }
                    System.setProperty("solr.solr.home", homeDir.getAbsolutePath());
                    File coreXmlFile = new File(homeDir, "solr.xml");
                    this.coreContainer = new CoreContainer(homeDir.getAbsolutePath(), coreXmlFile);
                    for (SolrCore aCore : this.coreContainer.getCores()) {
                        this.log.info("Loaded core: {}", (Object)aCore.getName());
                    }
                }
                if ((coreName = this.globalConfig.getString(null, new Object[]{"indexer", core, "coreName"})) == null) {
                    this.log.error("No 'coreName' node for core: '{}'", (Object)core);
                    return null;
                }
                return new EmbeddedSolrServer(this.coreContainer, coreName);
            }
            String uri = this.globalConfig.getString(null, new Object[]{"indexer", core, "uri"});
            if (uri == null) {
                this.log.error("No URI provided for core: '{}'", (Object)core);
                return null;
            }
            URI solrUri = new URI(uri);
            CommonsHttpSolrServer thisCore = new CommonsHttpSolrServer(solrUri.toURL());
            this.username = this.globalConfig.getString(null, new Object[]{"indexer", core, "username"});
            this.password = this.globalConfig.getString(null, new Object[]{"indexer", core, "password"});
            if (this.username != null && this.password != null) {
                UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(this.username, this.password);
                HttpClient hc = thisCore.getHttpClient();
                hc.getParams().setAuthenticationPreemptive(true);
                hc.getState().setCredentials(AuthScope.ANY, (Credentials)credentials);
            }
            return thisCore;
        }
        catch (MalformedURLException mue) {
            this.log.error(core + " : Malformed URL", (Throwable)mue);
        }
        catch (URISyntaxException urise) {
            this.log.error(core + " : Invalid URI", (Throwable)urise);
        }
        catch (IOException ioe) {
            this.log.error(core + " : Failed to read Solr configuration", (Throwable)ioe);
        }
        catch (ParserConfigurationException pce) {
            this.log.error(core + " : Failed to parse Solr configuration", (Throwable)pce);
        }
        catch (SAXException saxe) {
            this.log.error(core + " : Failed to load Solr configuration", (Throwable)saxe);
        }
        return null;
    }

    public String getId() {
        return QUEUE_ID;
    }

    public void start() throws Exception {
        this.thread.start();
    }

    public void stop() throws Exception {
        this.log.info("Stopping {}...", (Object)this.name);
        this.submitBuffer(true);
        if (this.coreContainer != null) {
            this.coreContainer.shutdown();
        }
        if (this.consumer != null) {
            try {
                this.consumer.close();
            }
            catch (JMSException jmse) {
                this.log.warn("Failed to close consumer: {}", (Object)jmse.getMessage());
                throw jmse;
            }
        }
        if (this.session != null) {
            try {
                this.session.close();
            }
            catch (JMSException jmse) {
                this.log.warn("Failed to close consumer session: {}", (Throwable)jmse);
            }
        }
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (JMSException jmse) {
                this.log.warn("Failed to close connection: {}", (Throwable)jmse);
            }
        }
        this.timer.cancel();
    }

    public void onMessage(Message message) {
        MDC.put((String)"name", (String)this.name);
        try {
            String text;
            JsonSimple config;
            String event;
            if (!Thread.currentThread().getName().equals(this.thread.getName())) {
                Thread.currentThread().setName(this.thread.getName());
                Thread.currentThread().setPriority(this.thread.getPriority());
            }
            if ((event = (config = new JsonSimple(text = ((TextMessage)message).getText())).getString(null, new Object[]{"event"})) == null) {
                this.log.error("Invalid message received: '{}'", (Object)text);
                return;
            }
            if (event.equals("commit")) {
                this.log.debug("Commit received");
                this.submitBuffer(true);
            }
            if (event.equals("index")) {
                String index = config.getString(null, new Object[]{"index"});
                String document = config.getString(null, new Object[]{"document"});
                if (index == null || document == null) {
                    this.log.error("Invalid message received: '{}'", (Object)text);
                    return;
                }
                this.addToBuffer(index, document);
            }
        }
        catch (JMSException jmse) {
            this.log.error("Failed to send/receive message: {}", (Object)jmse.getMessage());
        }
        catch (IOException ioe) {
            this.log.error("Failed to parse message: {}", (Object)ioe.getMessage());
        }
    }

    private void addToBuffer(String index, String document) {
        if (this.timerMDC == null) {
            this.timerMDC = MDC.get((String)"name");
        }
        int removedSize = 0;
        if (this.docBuffer.containsKey(index)) {
            this.log.debug("Removing buffer duplicate: '{}'", (Object)index);
            removedSize = this.docBuffer.get(index).length();
            this.docBuffer.remove(index);
        }
        int length = document.length() - removedSize;
        this.bufferYoungest = new Date().getTime();
        if (this.docBuffer.isEmpty()) {
            this.bufferOldest = new Date().getTime();
            this.log.debug("=== New buffer starting: {}", (Object)this.bufferOldest);
        }
        this.docBuffer.put(index, document);
        this.bufferSize += length;
        this.checkBuffer();
    }

    private void checkTimeout() {
        if (this.timerMDC != null) {
            MDC.put((String)"name", (String)this.timerMDC);
        }
        if (this.docBuffer.isEmpty()) {
            return;
        }
        long wait = (new Date().getTime() - this.bufferYoungest) / 1000L;
        if (wait < 20L) {
            return;
        }
        this.log.debug("=== Flushing old buffer: {}s", (Object)wait);
        this.submitBuffer(true);
    }

    private void checkBuffer() {
        if (this.docBuffer.size() >= this.bufferDocLimit) {
            this.log.debug("=== Buffer check: Doc limit reached '{}'", (Object)this.docBuffer.size());
            this.submitBuffer(false);
            return;
        }
        if (this.bufferSize > this.bufferSizeLimit) {
            this.log.debug("=== Buffer check: Size exceeded '{}'", (Object)this.bufferSize);
            this.submitBuffer(false);
            return;
        }
        long age = (new Date().getTime() - this.bufferOldest) / 1000L;
        if (age > (long)this.bufferTimeLimit) {
            this.log.debug("=== Buffer check: Age exceeded '{}s'", (Object)age);
            this.submitBuffer(false);
            return;
        }
    }

    private void submitBuffer(boolean forceCommit) {
        int size = this.docBuffer.size();
        if (size > 0) {
            this.log.debug("=== Submitting buffer: " + size + " documents");
            StringBuffer submissionBuffer = new StringBuffer();
            for (String doc : this.docBuffer.keySet()) {
                submissionBuffer.append(this.docBuffer.get(doc));
            }
            if (submissionBuffer.length() > 0) {
                String submission = submissionBuffer.insert(0, "<add>").append("</add>").toString();
                try {
                    this.solr.request((SolrRequest)new DirectXmlRequest("/update", submission));
                }
                catch (Exception ex) {
                    this.log.error("Error submitting documents to Solr!", (Throwable)ex);
                }
                if (this.autoCommit || forceCommit) {
                    this.log.info("Running forced commit!");
                    try {
                        if (this.commit != null) {
                            this.solr.commit();
                            this.commit.commit();
                        } else {
                            this.solr.commit();
                        }
                    }
                    catch (Exception e) {
                        this.log.warn("Solr forced commit failed. Document will not be visible until Solr autocommit fires. Error message: {}", (Throwable)e);
                    }
                }
            }
        }
        this.purgeBuffer();
    }

    private void purgeBuffer() {
        this.docBuffer.clear();
        this.bufferSize = 0;
        this.bufferOldest = 0L;
        this.bufferYoungest = 0L;
    }

    public void setPriority(int newPriority) {
        if (newPriority >= 1 && newPriority <= 10) {
            this.thread.setPriority(newPriority);
        }
    }
}

