/*
 * Decompiled with CFR 0.152.
 */
package org.wikidata.query.rdf.tool;

import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lexicalscope.jewel.cli.Option;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.openrdf.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.common.uri.WikibaseUris;
import org.wikidata.query.rdf.tool.OptionsUtils;
import org.wikidata.query.rdf.tool.change.Change;
import org.wikidata.query.rdf.tool.change.IdListChangeSource;
import org.wikidata.query.rdf.tool.change.IdRangeChangeSource;
import org.wikidata.query.rdf.tool.change.RecentChangesPoller;
import org.wikidata.query.rdf.tool.exception.ContainedException;
import org.wikidata.query.rdf.tool.exception.RetryableException;
import org.wikidata.query.rdf.tool.rdf.Munger;
import org.wikidata.query.rdf.tool.rdf.RdfRepository;
import org.wikidata.query.rdf.tool.wikibase.WikibaseRepository;

public class Update<B extends Change.Batch>
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(Update.class);
    private final MetricRegistry metrics = new MetricRegistry();
    private final Meter updateMeter = this.metrics.meter("updates");
    private final Meter batchAdvanced = this.metrics.meter("batch-progress");
    private final JmxReporter reporter = JmxReporter.forRegistry(this.metrics).build();
    private final Change.Source<B> changeSource;
    private final WikibaseRepository wikibase;
    private final RdfRepository rdfRepository;
    private final Munger munger;
    private final ExecutorService executor;
    private final int pollDelay;
    private final WikibaseUris uris;
    private Multimap<String, String> repoValues;
    private Multimap<String, String> repoRefs;
    private final boolean verify;

    public static void main(String[] args) {
        URI sparqlUri;
        Options options = OptionsUtils.handleOptions(Options.class, args);
        WikibaseRepository wikibaseRepository = Update.buildWikibaseRepository(options);
        if (wikibaseRepository == null) {
            return;
        }
        try {
            sparqlUri = new URI(options.sparqlUrl());
        }
        catch (URISyntaxException e) {
            log.error("Invalid url:  " + options.sparqlUrl() + " caused by " + e.getMessage());
            return;
        }
        WikibaseUris uris = new WikibaseUris(options.wikibaseHost());
        RdfRepository rdfRepository = new RdfRepository(sparqlUri, uris);
        Change.Source<Change.Batch> changeSource = Update.buildChangeSource(options, rdfRepository, wikibaseRepository);
        if (changeSource == null) {
            return;
        }
        int threads = options.threadCount();
        ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("update %s");
        ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory.build());
        Munger munger = OptionsUtils.mungerFromOptions(options);
        new Update<Change.Batch>(changeSource, wikibaseRepository, rdfRepository, munger, executor, options.pollDelay(), uris, options.verify()).run();
    }

    private static Change.Source<? extends Change.Batch> buildChangeSource(Options options, RdfRepository rdfRepository, WikibaseRepository wikibaseRepository) {
        long startTime;
        if (options.idrange() != null) {
            long end;
            long start;
            String[] ids = options.idrange().split("-");
            switch (ids.length) {
                case 1: {
                    if (!Character.isDigit(ids[0].charAt(0))) {
                        return new IdListChangeSource(ids, options.batchSize());
                    }
                    end = start = Long.parseLong(ids[0]);
                    break;
                }
                case 2: {
                    start = Long.parseLong(ids[0]);
                    end = Long.parseLong(ids[1]);
                    break;
                }
                default: {
                    log.error("Invalid format for --idrange.  Need <start>-<stop>.");
                    return null;
                }
            }
            return IdRangeChangeSource.forItems(start, end, options.batchSize());
        }
        if (options.ids() != null) {
            ArrayList<String> parsedIds = new ArrayList<String>();
            for (String idOpt : options.ids()) {
                if (idOpt.contains(",")) {
                    parsedIds.addAll(Arrays.asList(idOpt.split(",")));
                    continue;
                }
                parsedIds.add(idOpt);
            }
            return new IdListChangeSource(parsedIds.toArray(new String[parsedIds.size()]), options.batchSize());
        }
        if (options.start() != null) {
            try {
                startTime = WikibaseRepository.outputDateFormat().parse(options.start()).getTime();
            }
            catch (ParseException e) {
                try {
                    startTime = WikibaseRepository.inputDateFormat().parse(options.start()).getTime();
                }
                catch (ParseException e2) {
                    log.error("Invalid date:  {}", (Object)options.start());
                    return null;
                }
            }
        } else {
            log.info("Checking where we left off");
            Date leftOff = rdfRepository.fetchLeftOffTime();
            long minStartTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30L);
            if (leftOff == null) {
                startTime = minStartTime;
                log.info("Defaulting start time to 30 days ago:  {}", (Object)WikibaseRepository.inputDateFormat().format(new Date(startTime)));
            } else {
                if (leftOff.getTime() < minStartTime) {
                    log.error("RDF store reports the last update time is before the minimum safe poll time.  You will have to reload from scratch or you might have missing data.");
                    return null;
                }
                startTime = leftOff.getTime();
                log.info("Found start time in the RDF store: {}", (Object)WikibaseRepository.inputDateFormat().format(leftOff));
            }
        }
        return new RecentChangesPoller(wikibaseRepository, new Date(startTime), options.batchSize());
    }

    private static WikibaseRepository buildWikibaseRepository(Options options) {
        if (options.entityNamespaces() == null) {
            return new WikibaseRepository(options.wikibaseScheme(), options.wikibaseHost());
        }
        String[] strEntityNamespaces = options.entityNamespaces().split(",");
        long[] longEntityNamespaces = new long[strEntityNamespaces.length];
        try {
            for (int i = 0; i < strEntityNamespaces.length; ++i) {
                longEntityNamespaces[i] = Long.parseLong(strEntityNamespaces[i]);
            }
        }
        catch (NumberFormatException e) {
            log.error("Invalid value for --entityNamespaces. Namespace index should be an integer.", e);
            return null;
        }
        return new WikibaseRepository(options.wikibaseScheme(), options.wikibaseHost(), 0, longEntityNamespaces);
    }

    public Update(Change.Source<B> changeSource, WikibaseRepository wikibase, RdfRepository rdfRepository, Munger munger, ExecutorService executor, int pollDelay, WikibaseUris uris, boolean verify) {
        this.changeSource = changeSource;
        this.wikibase = wikibase;
        this.rdfRepository = rdfRepository;
        this.munger = munger;
        this.executor = executor;
        this.pollDelay = pollDelay;
        this.uris = uris;
        this.verify = verify;
        this.reporter.start();
    }

    @Override
    public void run() {
        Change.Batch batch = null;
        do {
            try {
                batch = (Change.Batch)this.changeSource.firstBatch();
            }
            catch (RetryableException e) {
                log.warn("Retryable error fetching first batch.  Retrying.", e);
            }
        } while (batch == null);
        log.debug("{} changes in batch", (Object)batch.changes().size());
        while (true) {
            try {
                while (true) {
                    this.handleChanges(batch);
                    Date leftOffDate = batch.leftOffDate();
                    if (leftOffDate != null) {
                        leftOffDate = new Date(batch.leftOffDate().getTime() - TimeUnit.SECONDS.toMillis(1L));
                        this.rdfRepository.updateLeftOffTime(leftOffDate);
                    }
                    this.batchAdvanced.mark(batch.advanced());
                    log.info("Polled up to {} at {} updates per second and {} {} per second", batch.leftOffHuman(), this.meterReport(this.updateMeter), this.meterReport(this.batchAdvanced), batch.advancedUnits());
                    if (batch.last()) {
                        return;
                    }
                    batch = this.nextBatch(batch);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                continue;
            }
            catch (ExecutionException e) {
                log.error("Syncing encountered a fatal exception", e);
                return;
            }
            break;
        }
    }

    private void handleChanges(Change.Batch batch) throws InterruptedException, ExecutionException {
        ArrayList tasks = new ArrayList();
        Set<Change> trueChanges = this.getRevisionUpdates(batch);
        long start = System.currentTimeMillis();
        for (final Change change : trueChanges) {
            tasks.add(this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    while (true) {
                        try {
                            Update.this.handleChange(change);
                            return;
                        }
                        catch (RetryableException e) {
                            log.warn("Retryable error syncing.  Retrying.", e);
                            continue;
                        }
                        catch (ContainedException e) {
                            log.warn("Contained error syncing.  Giving up on " + change.entityId(), e);
                            return;
                        }
                        break;
                    }
                }
            }));
        }
        for (Future future : tasks) {
            future.get();
        }
        log.debug("Preparing update data took {} ms, have {} changes", (Object)(System.currentTimeMillis() - start), (Object)trueChanges.size());
        this.rdfRepository.syncFromChanges(trueChanges, this.verify);
        this.updateMeter.mark(trueChanges.size());
    }

    private Set<Change> getRevisionUpdates(Change.Batch batch) {
        HashSet<Change> trueChanges = new HashSet<Change>();
        HashSet<String> changeIds = new HashSet<String>();
        HashMap<String, Change> candidateChanges = new HashMap<String, Change>();
        for (Change change : batch.changes()) {
            if (change.revision() >= 0L) {
                Change c = (Change)candidateChanges.get(change.entityId());
                if (c != null && c.revision() >= change.revision()) continue;
                candidateChanges.put(change.entityId(), change);
                continue;
            }
            trueChanges.add(change);
            changeIds.add(this.uris.entity() + change.entityId());
        }
        if (candidateChanges.size() > 0) {
            for (String entityId : this.rdfRepository.hasRevisions(candidateChanges.values())) {
                changeIds.add(entityId);
                trueChanges.add((Change)candidateChanges.get(entityId.substring(this.uris.entity().length())));
            }
        }
        log.debug("Filtered batch contains {} changes", (Object)trueChanges.size());
        if (trueChanges.size() > 0) {
            this.repoValues = this.rdfRepository.getValues(changeIds);
            log.debug("Fetched {} values", (Object)this.repoValues.size());
            this.repoRefs = this.rdfRepository.getRefs(changeIds);
            log.debug("Fetched {} refs", (Object)this.repoRefs.size());
        } else {
            this.repoValues = null;
            this.repoRefs = null;
        }
        return trueChanges;
    }

    private B nextBatch(B batch) throws InterruptedException {
        while (true) {
            try {
                batch = this.changeSource.nextBatch(batch);
            }
            catch (RetryableException e) {
                log.warn("Retryable error fetching next batch.  Retrying.", e);
                continue;
            }
            if (!batch.changes().isEmpty()) break;
            log.debug("Sleeping for {} secs", (Object)this.pollDelay);
            Thread.sleep(this.pollDelay * 1000);
        }
        log.debug("{} changes in batch", (Object)batch.changes().size());
        return batch;
    }

    private void handleChange(Change change) throws RetryableException {
        log.debug("Processing data for {}", (Object)change);
        Collection<Statement> statements = this.wikibase.fetchRdfForEntity(change.entityId());
        HashSet<String> values = new HashSet<String>(this.repoValues.get(change.entityId()));
        HashSet<String> refs = new HashSet<String>(this.repoRefs.get(change.entityId()));
        this.munger.munge(change.entityId(), statements, values, refs, change);
        ArrayList<String> cleanupList = new ArrayList<String>();
        cleanupList.addAll(values);
        cleanupList.addAll(refs);
        change.setStatements(statements);
        change.setCleanupList(cleanupList);
    }

    private String meterReport(Meter meter) {
        return String.format(Locale.ROOT, "(%.1f, %.1f, %.1f)", meter.getOneMinuteRate(), meter.getFiveMinuteRate(), meter.getFifteenMinuteRate());
    }

    public static interface Options
    extends OptionsUtils.BasicOptions,
    OptionsUtils.MungerOptions,
    OptionsUtils.WikibaseOptions {
        @Option(defaultValue={"https"}, description="Wikidata url scheme")
        public String wikibaseScheme();

        @Option(shortName={"s"}, defaultToNull=true, description="Start time in 2015-02-11T17:11:08Z or 20150211170100 format.")
        public String start();

        @Option(defaultToNull=true, description="If specified must be <id> or list of <id>, comma or space separated.")
        public List<String> ids();

        @Option(defaultToNull=true, description="If specified must be <start>-<end>. Ids are iterated instead of recent changes. Start and end are inclusive.")
        public String idrange();

        @Option(shortName={"u"}, description="URL to post updates and queries.")
        public String sparqlUrl();

        @Option(shortName={"d"}, defaultValue={"10"}, description="Poll delay when no updates found")
        public int pollDelay();

        @Option(shortName={"t"}, defaultValue={"10"}, description="Thread count")
        public int threadCount();

        @Option(shortName={"b"}, defaultValue={"100"}, description="Number of recent changes fetched at a time.")
        public int batchSize();

        @Option(shortName={"V"}, longName={"verify"}, description="Verify updates (may have performance impact)")
        public boolean verify();

        @Option(defaultToNull=true, description="If specified must be numerical indexes of Item and Property namespaces that defined in Wikibase repository, comma separated.")
        public String entityNamespaces();
    }
}

