/*
 * Decompiled with CFR 0.152.
 */
package org.wikidata.query.rdf.tool;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Multimap;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.openrdf.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.common.uri.WikibaseUris;
import org.wikidata.query.rdf.tool.change.Change;
import org.wikidata.query.rdf.tool.exception.ContainedException;
import org.wikidata.query.rdf.tool.exception.RetryableException;
import org.wikidata.query.rdf.tool.rdf.Munger;
import org.wikidata.query.rdf.tool.rdf.RdfRepository;
import org.wikidata.query.rdf.tool.wikibase.WikibaseRepository;

public class Updater<B extends Change.Batch>
implements Runnable,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(Updater.class);
    private final Meter updatesMeter;
    private final Meter batchAdvanced;
    private final Change.Source<B> changeSource;
    private final WikibaseRepository wikibase;
    private final RdfRepository rdfRepository;
    private final Munger munger;
    private final ExecutorService executor;
    private final int pollDelay;
    private final WikibaseUris uris;
    private final DelayQueue<Change.DelayedChange> deferralQueue;
    private ImmutableSetMultimap<String, String> repoValues;
    private ImmutableSetMultimap<String, String> repoRefs;
    private final boolean verify;

    Updater(Change.Source<B> changeSource, WikibaseRepository wikibase, RdfRepository rdfRepository, Munger munger, ExecutorService executor, int pollDelay, WikibaseUris uris, boolean verify, MetricRegistry metricRegistry) {
        this.changeSource = changeSource;
        this.wikibase = wikibase;
        this.rdfRepository = rdfRepository;
        this.munger = munger;
        this.executor = executor;
        this.pollDelay = pollDelay;
        this.uris = uris;
        this.verify = verify;
        this.updatesMeter = metricRegistry.meter("updates");
        this.batchAdvanced = metricRegistry.meter("batch-progress");
        this.deferralQueue = new DelayQueue();
    }

    @Override
    public void run() {
        Change.Batch batch = null;
        do {
            try {
                batch = (Change.Batch)this.changeSource.firstBatch();
            }
            catch (RetryableException e) {
                log.warn("Retryable error fetching first batch.  Retrying.", (Throwable)e);
            }
        } while (batch == null);
        log.debug("{} changes in batch", (Object)batch.changes().size());
        Instant oldDate = null;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                this.handleChanges(Updater.addDeferredChanges(this.deferralQueue, batch.changes()));
                Instant leftOffDate = batch.leftOffDate();
                if (leftOffDate != null) {
                    leftOffDate = leftOffDate.minusSeconds(1L);
                    if (oldDate == null || !oldDate.equals(leftOffDate)) {
                        this.syncDate(leftOffDate);
                        oldDate = leftOffDate;
                    }
                }
                this.batchAdvanced.mark(batch.advanced());
                log.info("Polled up to {} at {} updates per second and {} {} per second", new Object[]{batch.leftOffHuman(), this.meterReport(this.updatesMeter), this.meterReport(this.batchAdvanced), batch.advancedUnits()});
                if (batch.last()) {
                    return;
                }
                this.wikibase.batchDone();
                batch = this.nextBatch(batch);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @VisibleForTesting
    static Collection<Change> addDeferredChanges(DelayQueue<Change.DelayedChange> deferralQueue, Collection<Change> newChanges) {
        if (deferralQueue.isEmpty()) {
            return newChanges;
        }
        LinkedList<Change> allChanges = new LinkedList<Change>(newChanges);
        int deferrals = 0;
        Set changeIds = (Set)newChanges.stream().map(Change::entityId).collect(ImmutableSet.toImmutableSet());
        Change.DelayedChange deferred = (Change.DelayedChange)deferralQueue.poll();
        while (deferred != null) {
            if (!changeIds.contains(deferred.getChange().entityId())) {
                allChanges.add(deferred.getChange());
                ++deferrals;
            }
            deferred = (Change.DelayedChange)deferralQueue.poll();
        }
        log.info("Added {} deferred changes, {} still in the queue", (Object)deferrals, (Object)deferralQueue.size());
        return allChanges;
    }

    protected void syncDate(Instant newDate) {
        this.rdfRepository.updateLeftOffTime(newDate);
    }

    @Override
    public void close() throws IOException {
        this.executor.shutdown();
        this.changeSource.close();
    }

    protected void handleChanges(Iterable<Change> changes) throws InterruptedException {
        Set<Change> trueChanges = this.getRevisionUpdates(changes);
        long start = System.currentTimeMillis();
        ArrayList<Future<Change>> futureChanges = new ArrayList<Future<Change>>();
        for (Change change : trueChanges) {
            futureChanges.add(this.executor.submit(() -> {
                while (true) {
                    try {
                        this.handleChange(change);
                        return change;
                    }
                    catch (RetryableException e) {
                        log.warn("Retryable error syncing.  Retrying.", (Throwable)e);
                        continue;
                    }
                    catch (ContainedException e) {
                        log.warn("Contained error syncing.  Giving up on {}", (Object)change.entityId(), (Object)e);
                        throw e;
                    }
                    break;
                }
            }));
        }
        ArrayList<Change> processedChanges = new ArrayList<Change>();
        for (Future future : futureChanges) {
            try {
                processedChanges.add((Change)future.get());
            }
            catch (ExecutionException executionException) {}
        }
        log.debug("Preparing update data took {} ms, have {} changes", (Object)(System.currentTimeMillis() - start), (Object)processedChanges.size());
        this.rdfRepository.syncFromChanges(processedChanges, this.verify);
        this.updatesMeter.mark((long)processedChanges.size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<Change> getRevisionUpdates(Iterable<Change> changes) {
        HashSet<Change> trueChanges = new HashSet<Change>();
        HashSet<String> changeIds = new HashSet<String>();
        HashMap<String, Change> candidateChanges = new HashMap<String, Change>();
        for (Change change : changes) {
            if (change.revision() > -1L) {
                Change c = (Change)candidateChanges.get(change.entityId());
                if (c != null && c.revision() >= change.revision()) continue;
                candidateChanges.put(change.entityId(), change);
                continue;
            }
            trueChanges.add(change);
            changeIds.add(this.uris.entity() + change.entityId());
        }
        if (candidateChanges.size() > 0) {
            for (String entityId : this.rdfRepository.hasRevisions(candidateChanges.values())) {
                changeIds.add(entityId);
                trueChanges.add((Change)candidateChanges.get(entityId.substring(this.uris.entity().length())));
            }
        }
        log.debug("Filtered batch contains {} changes", (Object)trueChanges.size());
        if (!trueChanges.isEmpty()) {
            this.setValuesAndRefs(this.rdfRepository.getValues(changeIds), this.rdfRepository.getRefs(changeIds));
            if (log.isDebugEnabled()) {
                Updater updater = this;
                synchronized (updater) {
                    log.debug("Fetched {} values", (Object)this.repoValues.size());
                    log.debug("Fetched {} refs", (Object)this.repoRefs.size());
                }
            }
        } else {
            this.setValuesAndRefs(null, null);
        }
        return trueChanges;
    }

    private synchronized void setValuesAndRefs(ImmutableSetMultimap<String, String> values, ImmutableSetMultimap<String, String> refs) {
        this.repoValues = values;
        this.repoRefs = refs;
    }

    private B nextBatch(B prevBatch) throws InterruptedException {
        B batch;
        while (true) {
            try {
                batch = this.changeSource.nextBatch(prevBatch);
            }
            catch (RetryableException e) {
                log.warn("Retryable error fetching next batch.  Retrying.", (Throwable)e);
                continue;
            }
            if (!batch.hasAnyChanges()) {
                log.info("Sleeping for {} secs", (Object)this.pollDelay);
                Thread.sleep(this.pollDelay * 1000);
                continue;
            }
            if (!batch.changes().isEmpty()) break;
            prevBatch = batch;
        }
        log.debug("{} changes in batch", (Object)batch.changes().size());
        return batch;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleChange(Change change) throws RetryableException {
        ImmutableSetMultimap<String, String> repoRefs;
        ImmutableSetMultimap<String, String> repoValues;
        log.debug("Processing data for {}", (Object)change);
        Collection<Statement> statements = this.wikibase.fetchRdfForEntity(change.entityId());
        HashSet<String> values = new HashSet<String>();
        HashSet<String> refs = new HashSet<String>();
        Updater updater = this;
        synchronized (updater) {
            repoValues = this.repoValues;
            repoRefs = this.repoRefs;
        }
        this.munger.mungeWithValues(change.entityId(), statements, (Multimap<String, String>)repoValues, (Multimap<String, String>)repoRefs, (Collection<String>)values, (Collection<String>)refs, change, (Queue<Change.DelayedChange>)this.deferralQueue);
        change.setRefCleanupList(refs);
        change.setValueCleanupList(values);
        change.setStatements(statements);
    }

    private String meterReport(Meter meter) {
        return String.format(Locale.ROOT, "(%.1f, %.1f, %.1f)", meter.getOneMinuteRate(), meter.getFiveMinuteRate(), meter.getFifteenMinuteRate());
    }
}

