/*
 * Decompiled with CFR 0.152.
 */
package de.unibonn.iai.eis.luzzu.io.impl;

import com.hp.hpl.jena.query.Dataset;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.NodeIterator;
import com.hp.hpl.jena.rdf.model.Resource;
import de.unibonn.iai.eis.luzzu.annotations.QualityMetadata;
import de.unibonn.iai.eis.luzzu.annotations.QualityReport;
import de.unibonn.iai.eis.luzzu.assessment.ComplexQualityMetric;
import de.unibonn.iai.eis.luzzu.assessment.QualityMetric;
import de.unibonn.iai.eis.luzzu.cache.CacheManager;
import de.unibonn.iai.eis.luzzu.cache.CacheObject;
import de.unibonn.iai.eis.luzzu.datatypes.Object2Quad;
import de.unibonn.iai.eis.luzzu.exceptions.ExternalMetricLoaderException;
import de.unibonn.iai.eis.luzzu.exceptions.MetadataException;
import de.unibonn.iai.eis.luzzu.exceptions.ProcessorNotInitialised;
import de.unibonn.iai.eis.luzzu.io.IOProcessor;
import de.unibonn.iai.eis.luzzu.io.configuration.DeclerativeMetricCompiler;
import de.unibonn.iai.eis.luzzu.io.configuration.ExternalMetricLoader;
import de.unibonn.iai.eis.luzzu.io.impl.StreamMetadataSniffer;
import de.unibonn.iai.eis.luzzu.properties.EnvironmentProperties;
import de.unibonn.iai.eis.luzzu.properties.PropertyManager;
import de.unibonn.iai.eis.luzzu.qml.parser.ParseException;
import de.unibonn.iai.eis.luzzu.semantics.vocabularies.LMI;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFDataMgr;
import org.apache.jena.riot.RDFLanguages;
import org.apache.jena.riot.lang.PipedQuadsStream;
import org.apache.jena.riot.lang.PipedRDFIterator;
import org.apache.jena.riot.lang.PipedRDFStream;
import org.apache.jena.riot.lang.PipedTriplesStream;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamProcessor
implements IOProcessor {
    private final CacheManager cacheMgr = CacheManager.getInstance();
    private final String graphCacheName = PropertyManager.getInstance().getProperties("cache.properties").getProperty("GRAPH_METADATA_CACHE");
    private ConcurrentMap<String, QualityMetric> metricInstances = new ConcurrentHashMap<String, QualityMetric>();
    private ExternalMetricLoader loader = ExternalMetricLoader.getInstance();
    private DeclerativeMetricCompiler dmc = DeclerativeMetricCompiler.getInstance();
    static final Logger logger = LoggerFactory.getLogger(StreamProcessor.class);
    private List<String> datasetList;
    private String datasetURI;
    private boolean genQualityReport;
    private Model metricConfiguration;
    private Model qualityReport;
    protected PipedRDFIterator<?> iterator;
    protected PipedRDFStream<?> rdfStream;
    private ExecutorService executor;
    private List<MetricProcess> lstMetricConsumers = new ArrayList<MetricProcess>();
    private boolean isInitalised = false;

    public StreamProcessor(String datasetURI, boolean genQualityReport, Model configuration) {
        this.datasetList = new ArrayList<String>();
        this.datasetList.add(datasetURI);
        this.genQualityReport = genQualityReport;
        this.metricConfiguration = configuration;
        this.cacheMgr.createNewCache(this.graphCacheName, 50);
        PropertyManager.getInstance().addToEnvironmentVars("datasetURI", datasetURI);
    }

    public StreamProcessor(String baseURI, List<String> datasetList, boolean genQualityReport, Model configuration) {
        this.datasetList = datasetList;
        this.genQualityReport = genQualityReport;
        this.metricConfiguration = configuration;
        this.cacheMgr.createNewCache(this.graphCacheName, 50);
        PropertyManager.getInstance().addToEnvironmentVars("datasetURI", baseURI);
    }

    public void processorWorkFlow() {
        this.setUpProcess();
        int datasetListCounter = 0;
        Iterator<String> i$ = this.datasetList.iterator();
        while (i$.hasNext()) {
            String dataset;
            this.datasetURI = dataset = i$.next();
            try {
                this.startProcessing();
            }
            catch (ProcessorNotInitialised e) {
                this.processorWorkFlow();
            }
            if (++datasetListCounter >= this.datasetList.size()) continue;
            this.reinitialiseProcessors();
        }
        this.generateQualityMetadata();
        if (this.genQualityReport) {
            this.generateQualityReport();
        }
    }

    private void reinitialiseProcessors() {
        Lang lang;
        if (!this.executor.isShutdown()) {
            this.executor.shutdownNow();
        }
        if ((lang = RDFLanguages.filenameToLang((String)this.datasetURI)) == Lang.NQ || lang == Lang.NQUADS) {
            this.iterator = new PipedRDFIterator();
            this.rdfStream = new PipedQuadsStream(this.iterator);
        } else {
            this.iterator = new PipedRDFIterator();
            this.rdfStream = new PipedTriplesStream(this.iterator);
        }
        this.isInitalised = true;
        this.executor = Executors.newSingleThreadExecutor();
        this.lstMetricConsumers = new ArrayList<MetricProcess>();
        for (String className : this.metricInstances.keySet()) {
            this.lstMetricConsumers.add(new MetricProcess((QualityMetric)this.metricInstances.get(className)));
        }
    }

    @Override
    public void setUpProcess() {
        Lang lang = RDFLanguages.filenameToLang((String)this.datasetURI);
        if (lang == Lang.NQ || lang == Lang.NQUADS) {
            this.iterator = new PipedRDFIterator();
            this.rdfStream = new PipedQuadsStream(this.iterator);
        } else {
            this.iterator = new PipedRDFIterator();
            this.rdfStream = new PipedTriplesStream(this.iterator);
        }
        this.isInitalised = true;
        try {
            this.loadMetrics();
        }
        catch (ExternalMetricLoaderException e) {
            logger.error(e.getLocalizedMessage());
        }
        this.executor = Executors.newSingleThreadExecutor();
    }

    @Override
    public void cleanUp() throws ProcessorNotInitialised {
        this.isInitalised = false;
        this.lstMetricConsumers.clear();
        this.metricInstances.clear();
        if (!this.executor.isShutdown()) {
            this.executor.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startProcessing() throws ProcessorNotInitialised {
        if (!this.isInitalised) {
            throw new ProcessorNotInitialised("Streaming will not start as processor has not been initalised");
        }
        StreamMetadataSniffer sniffer = new StreamMetadataSniffer();
        Runnable parser = new Runnable(){

            @Override
            public void run() {
                RDFDataMgr.parse(StreamProcessor.this.rdfStream, (String)StreamProcessor.this.datasetURI);
            }
        };
        this.executor.submit(parser);
        try {
            while (this.iterator.hasNext()) {
                Object2Quad stmt = new Object2Quad(this.iterator.next());
                sniffer.sniff(stmt.getStatement());
                if (this.lstMetricConsumers == null) continue;
                for (MetricProcess mConsumer : this.lstMetricConsumers) {
                    mConsumer.notifyNewQuad(stmt);
                }
            }
        }
        finally {
            if (this.lstMetricConsumers != null) {
                for (MetricProcess mConsumer : this.lstMetricConsumers) {
                    mConsumer.stop();
                }
            }
        }
        if (sniffer.getCachingObject() != null) {
            this.cacheMgr.addToCache(this.graphCacheName, (Object)this.datasetURI, (CacheObject)sniffer.getCachingObject());
        }
        for (String clazz : this.metricInstances.keySet()) {
            if (this.metricInstances.get(clazz) instanceof ComplexQualityMetric) {
                ((ComplexQualityMetric)this.metricInstances.get(clazz)).after(new Object[0]);
            }
            ((QualityMetric)this.metricInstances.get(clazz)).metricValue();
        }
    }

    private void loadMetrics() throws ExternalMetricLoaderException {
        NodeIterator iter = this.metricConfiguration.listObjectsOfProperty(LMI.metric);
        Map<String, Class<? extends QualityMetric>> map = this.loader.getQualityMetricClasses();
        try {
            map.putAll(this.dmc.compile());
        }
        catch (IOException e1) {
            e1.printStackTrace();
        }
        catch (ParseException e1) {
            e1.printStackTrace();
        }
        while (iter.hasNext()) {
            String className = iter.next().toString();
            Class<? extends QualityMetric> clazz = map.get(className);
            QualityMetric metric = null;
            try {
                metric = clazz.newInstance();
            }
            catch (InstantiationException e) {
                logger.error("Cannot load metric for {}", (Object)className);
                throw new ExternalMetricLoaderException("Cannot create class instance for " + className + ". Exception caused by an Instantiation Exception : " + e.getLocalizedMessage());
            }
            catch (IllegalAccessException e) {
                logger.error("Cannot load metric for {}", (Object)className);
                throw new ExternalMetricLoaderException("Cannot create class instance " + className + ". Exception caused by an Illegal Access Exception : " + e.getLocalizedMessage());
            }
            this.metricInstances.put(className, metric);
        }
        for (String className : this.metricInstances.keySet()) {
            if (this.metricInstances.get(className) instanceof ComplexQualityMetric) {
                ((ComplexQualityMetric)this.metricInstances.get(className)).before(new Object[0]);
            }
            this.lstMetricConsumers.add(new MetricProcess((QualityMetric)this.metricInstances.get(className)));
        }
    }

    private void generateQualityReport() {
        QualityReport r = new QualityReport();
        ArrayList<Model> qualityProblems = new ArrayList<Model>();
        for (String className : this.metricInstances.keySet()) {
            QualityMetric m = (QualityMetric)this.metricInstances.get(className);
            qualityProblems.add(r.createQualityProblem(m.getMetricURI(), m.getQualityProblems()));
        }
        Resource res = null;
        try {
            res = ModelFactory.createDefaultModel().createResource(EnvironmentProperties.getInstance().getDatasetURI());
        }
        catch (Exception e) {
            logger.error("Dataset Exception " + e.getLocalizedMessage());
        }
        this.qualityReport = r.createQualityReport(res, qualityProblems);
    }

    private void generateQualityMetadata() {
        Resource res = null;
        try {
            res = ModelFactory.createDefaultModel().createResource(EnvironmentProperties.getInstance().getDatasetURI());
        }
        catch (Exception e1) {
            logger.error("Dataset Exception " + e1.getLocalizedMessage());
        }
        QualityMetadata md = new QualityMetadata(res, false);
        for (String className : this.metricInstances.keySet()) {
            QualityMetric m = (QualityMetric)this.metricInstances.get(className);
            md.addMetricData(m);
        }
        try {
            RDFDataMgr.write((OutputStream)System.out, (Dataset)md.createQualityMetadata(), (Lang)Lang.TRIG);
        }
        catch (MetadataException e) {
            logger.error(e.getMessage());
        }
    }

    @Override
    public Model retreiveQualityReport() {
        return this.qualityReport;
    }

    private final class MetricProcess {
        volatile Queue<Object2Quad> quadsToProcess = new BlockingArrayQueue(10000000);
        Thread metricThread = null;
        String metricName = null;
        Integer stmtsProcessed = 0;
        boolean stopSignal = false;

        MetricProcess(final QualityMetric m) {
            this.metricName = m.getClass().getSimpleName();
            this.metricThread = new Thread(){

                @Override
                public void run() {
                    logger.debug("Starting thread for metric {}", (Object)m.getClass().getName());
                    Object2Quad curQuad = null;
                    while (!MetricProcess.this.stopSignal || !MetricProcess.this.quadsToProcess.isEmpty()) {
                        curQuad = MetricProcess.this.quadsToProcess.poll();
                        if (curQuad == null) continue;
                        logger.trace("Metric {}, new quad (processed: {}, to-process: {})", new Object[]{m.getClass().getName(), MetricProcess.this.stmtsProcessed, MetricProcess.this.quadsToProcess.size()});
                        m.compute(curQuad.getStatement());
                        curQuad = null;
                        Integer n = MetricProcess.this.stmtsProcessed;
                        Integer n2 = MetricProcess.this.stmtsProcessed = Integer.valueOf(MetricProcess.this.stmtsProcessed + 1);
                    }
                    logger.debug("Thread for metric {} completed, total statements processed {}", (Object)m.getClass().getName(), (Object)MetricProcess.this.stmtsProcessed);
                }
            };
            this.metricThread.start();
        }

        public void notifyNewQuad(Object2Quad newQuad) {
            this.quadsToProcess.add(newQuad);
            logger.trace("Metric {}, element added to queue (to-process: {})", (Object)this.metricName, (Object)this.quadsToProcess.size());
        }

        public void stop() {
            while (!this.quadsToProcess.isEmpty()) {
                logger.debug("Waiting for items on queue: {} Metric: {}", (Object)this.quadsToProcess.size(), (Object)this.metricName);
            }
            this.stopSignal = true;
        }
    }
}

