/*
 * Decompiled with CFR 0.152.
 */
package org.wikidata.query.rdf.tool;

import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableSetMultimap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.lang3.time.DateUtils;
import org.openrdf.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.common.uri.WikibaseUris;
import org.wikidata.query.rdf.tool.change.Change;
import org.wikidata.query.rdf.tool.exception.ContainedException;
import org.wikidata.query.rdf.tool.exception.RetryableException;
import org.wikidata.query.rdf.tool.rdf.Munger;
import org.wikidata.query.rdf.tool.rdf.RdfRepository;
import org.wikidata.query.rdf.tool.wikibase.WikibaseRepository;

public class Updater<B extends Change.Batch>
implements Runnable,
AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(Updater.class);
    private final MetricRegistry metrics = new MetricRegistry();
    private final Meter updateMeter = this.metrics.meter("updates");
    private final Meter batchAdvanced = this.metrics.meter("batch-progress");
    private final JmxReporter reporter = JmxReporter.forRegistry(this.metrics).build();
    private final Change.Source<B> changeSource;
    private final WikibaseRepository wikibase;
    private final RdfRepository rdfRepository;
    private final Munger munger;
    private final ExecutorService executor;
    private final int pollDelay;
    private final WikibaseUris uris;
    private volatile ImmutableSetMultimap<String, String> repoValues;
    private volatile ImmutableSetMultimap<String, String> repoRefs;
    private final boolean verify;

    public Updater(Change.Source<B> changeSource, WikibaseRepository wikibase, RdfRepository rdfRepository, Munger munger, ExecutorService executor, int pollDelay, WikibaseUris uris, boolean verify) {
        this.changeSource = changeSource;
        this.wikibase = wikibase;
        this.rdfRepository = rdfRepository;
        this.munger = munger;
        this.executor = executor;
        this.pollDelay = pollDelay;
        this.uris = uris;
        this.verify = verify;
        this.reporter.start();
    }

    @Override
    public void run() {
        Change.Batch batch = null;
        do {
            try {
                batch = (Change.Batch)this.changeSource.firstBatch();
            }
            catch (RetryableException e) {
                log.warn("Retryable error fetching first batch.  Retrying.", e);
            }
        } while (batch == null);
        log.debug("{} changes in batch", (Object)batch.changes().size());
        Date oldDate = null;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                this.handleChanges(batch.changes());
                Date leftOffDate = batch.leftOffDate();
                if (leftOffDate != null) {
                    leftOffDate = DateUtils.addSeconds(leftOffDate, -1);
                    if (oldDate == null || !oldDate.equals(leftOffDate)) {
                        this.rdfRepository.updateLeftOffTime(leftOffDate);
                        oldDate = leftOffDate;
                    }
                }
                this.batchAdvanced.mark(batch.advanced());
                log.info("Polled up to {} at {} updates per second and {} {} per second", batch.leftOffHuman(), this.meterReport(this.updateMeter), this.meterReport(this.batchAdvanced), batch.advancedUnits());
                if (batch.last()) {
                    return;
                }
                batch = this.nextBatch(batch);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                log.error("Syncing encountered a fatal exception", e);
                break;
            }
        }
    }

    @Override
    public void close() {
        this.executor.shutdown();
    }

    private void handleChanges(Iterable<Change> changes) throws InterruptedException, ExecutionException {
        Set<Change> trueChanges = this.getRevisionUpdates(changes);
        long start = System.currentTimeMillis();
        ArrayList<Future<Change>> futureChanges = new ArrayList<Future<Change>>();
        for (Change change : trueChanges) {
            futureChanges.add(this.executor.submit(() -> {
                while (true) {
                    try {
                        this.handleChange(change);
                        return change;
                    }
                    catch (RetryableException e) {
                        log.warn("Retryable error syncing.  Retrying.", e);
                        continue;
                    }
                    catch (ContainedException e) {
                        log.warn("Contained error syncing.  Giving up on {}", (Object)change.entityId(), (Object)e);
                        throw e;
                    }
                    break;
                }
            }));
        }
        ArrayList<Change> processedChanges = new ArrayList<Change>();
        for (Future future : futureChanges) {
            try {
                processedChanges.add((Change)future.get());
            }
            catch (ExecutionException executionException) {}
        }
        log.debug("Preparing update data took {} ms, have {} changes", (Object)(System.currentTimeMillis() - start), (Object)processedChanges.size());
        this.rdfRepository.syncFromChanges(processedChanges, this.verify);
        this.updateMeter.mark(processedChanges.size());
    }

    private Set<Change> getRevisionUpdates(Iterable<Change> changes) {
        HashSet<Change> trueChanges = new HashSet<Change>();
        HashSet<String> changeIds = new HashSet<String>();
        HashMap<String, Change> candidateChanges = new HashMap<String, Change>();
        for (Change change : changes) {
            if (change.revision() >= 0L) {
                Change c = (Change)candidateChanges.get(change.entityId());
                if (c != null && c.revision() >= change.revision()) continue;
                candidateChanges.put(change.entityId(), change);
                continue;
            }
            trueChanges.add(change);
            changeIds.add(this.uris.entity() + change.entityId());
        }
        if (candidateChanges.size() > 0) {
            for (String entityId : this.rdfRepository.hasRevisions(candidateChanges.values())) {
                changeIds.add(entityId);
                trueChanges.add((Change)candidateChanges.get(entityId.substring(this.uris.entity().length())));
            }
        }
        log.debug("Filtered batch contains {} changes", (Object)trueChanges.size());
        if (trueChanges.size() > 0) {
            this.repoValues = this.rdfRepository.getValues(changeIds);
            log.debug("Fetched {} values", (Object)this.repoValues.size());
            this.repoRefs = this.rdfRepository.getRefs(changeIds);
            log.debug("Fetched {} refs", (Object)this.repoRefs.size());
        } else {
            this.repoValues = null;
            this.repoRefs = null;
        }
        return trueChanges;
    }

    private B nextBatch(B prevBatch) throws InterruptedException {
        B batch;
        while (true) {
            try {
                batch = this.changeSource.nextBatch(prevBatch);
            }
            catch (RetryableException e) {
                log.warn("Retryable error fetching next batch.  Retrying.", e);
                continue;
            }
            if (!batch.hasAnyChanges()) {
                log.info("Sleeping for {} secs", (Object)this.pollDelay);
                Thread.sleep(this.pollDelay * 1000);
                continue;
            }
            if (!batch.changes().isEmpty()) break;
            prevBatch = batch;
        }
        log.debug("{} changes in batch", (Object)batch.changes().size());
        return batch;
    }

    private void handleChange(Change change) throws RetryableException {
        log.debug("Processing data for {}", (Object)change);
        Collection<Statement> statements = this.wikibase.fetchRdfForEntity(change.entityId());
        HashSet<String> values = new HashSet<String>(this.repoValues.get((Object)change.entityId()));
        HashSet<String> refs = new HashSet<String>(this.repoRefs.get((Object)change.entityId()));
        this.munger.munge(change.entityId(), statements, values, refs, change);
        ArrayList<String> cleanupList = new ArrayList<String>();
        cleanupList.addAll(values);
        cleanupList.addAll(refs);
        change.setStatements(statements);
        change.setCleanupList(cleanupList);
    }

    private String meterReport(Meter meter) {
        return String.format(Locale.ROOT, "(%.1f, %.1f, %.1f)", meter.getOneMinuteRate(), meter.getFiveMinuteRate(), meter.getFifteenMinuteRate());
    }
}

