/*
 * Decompiled with CFR 0.152.
 */
package org.fcrepo.persistence.ocfl.impl;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.fcrepo.common.db.DbTransactionExecutor;
import org.fcrepo.config.OcflPropsConfig;
import org.fcrepo.kernel.api.Transaction;
import org.fcrepo.kernel.api.TransactionManager;
import org.fcrepo.persistence.ocfl.impl.ReindexService;
import org.fcrepo.persistence.ocfl.impl.ReindexWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReindexManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReindexManager.class);
    private static final long REPORTING_INTERVAL_SECS = 300L;
    private final List<ReindexWorker> workers;
    private final Iterator<String> ocflIter;
    private final Stream<String> ocflStream;
    private final AtomicInteger completedCount;
    private final AtomicInteger errorCount;
    private final ReindexService reindexService;
    private final long batchSize;
    private final boolean failOnError;
    private TransactionManager txManager;
    private DbTransactionExecutor dbTransactionExecutor;
    private Transaction transaction = null;

    public ReindexManager(Stream<String> ids, ReindexService reindexService, OcflPropsConfig config, TransactionManager manager, DbTransactionExecutor dbTransactionExecutor) {
        this.ocflStream = ids;
        this.ocflIter = this.ocflStream.iterator();
        this.reindexService = reindexService;
        this.batchSize = config.getReindexBatchSize();
        this.failOnError = config.isReindexFailOnError();
        this.txManager = manager;
        this.dbTransactionExecutor = dbTransactionExecutor;
        this.workers = new ArrayList<ReindexWorker>();
        this.completedCount = new AtomicInteger(0);
        this.errorCount = new AtomicInteger(0);
        long workerCount = config.getReindexingThreads();
        if (workerCount < 1L) {
            throw new IllegalStateException(String.format("Reindexing requires at least 1 thread. Found: %s", workerCount));
        }
        int i = 0;
        while ((long)i < workerCount) {
            this.workers.add(new ReindexWorker("ReindexWorker-" + i, this, this.reindexService, this.txManager, this.dbTransactionExecutor, this.failOnError));
            ++i;
        }
    }

    public void start() throws InterruptedException {
        Thread reporter = this.startReporter();
        try {
            this.workers.forEach(ReindexWorker::start);
            for (ReindexWorker worker : this.workers) {
                worker.join();
            }
            if (!this.failOnError || this.errorCount.get() == 0) {
                this.indexMembership();
            } else {
                LOGGER.error("Reindex did not complete successfully");
            }
        }
        catch (Exception e) {
            LOGGER.error("Error while rebuilding index", (Throwable)e);
            this.stop();
            throw e;
        }
        finally {
            reporter.interrupt();
        }
    }

    public void stop() {
        LOGGER.debug("Stop worker threads");
        this.workers.forEach(ReindexWorker::stopThread);
    }

    public synchronized List<String> getIds() {
        int counter = 0;
        ArrayList<String> ids = new ArrayList<String>((int)this.batchSize);
        while (this.ocflIter.hasNext() && (long)counter < this.batchSize) {
            ids.add(this.ocflIter.next());
            ++counter;
        }
        return ids;
    }

    public void updateComplete(int batchSuccessful, int batchErrors) {
        this.completedCount.addAndGet(batchSuccessful);
        this.errorCount.addAndGet(batchErrors);
    }

    public int getCompletedCount() {
        return this.completedCount.get();
    }

    public int getErrorCount() {
        return this.errorCount.get();
    }

    private void indexMembership() {
        Transaction tx = this.transaction();
        LOGGER.info("Starting membership indexing");
        this.reindexService.indexMembership(tx);
        tx.commit();
        LOGGER.debug("Completed membership indexing");
    }

    public void shutdown() {
        this.ocflStream.close();
    }

    private Thread startReporter() {
        Thread reporter = new Thread(() -> {
            Instant startTime = Instant.now();
            try {
                while (true) {
                    TimeUnit.SECONDS.sleep(300L);
                    int complete = this.completedCount.get();
                    int errored = this.errorCount.get();
                    Instant now = Instant.now();
                    Duration duration = Duration.between(startTime, now);
                    LOGGER.info("Index rebuild progress: Complete: {}; Errored: {}; Time: {}; Rate: {}/s", new Object[]{complete, errored, this.getDurationMessage(duration), (long)(complete + errored) / duration.getSeconds()});
                }
            }
            catch (InterruptedException interruptedException) {
                return;
            }
        });
        reporter.start();
        return reporter;
    }

    private String getDurationMessage(Duration duration) {
        Object message = String.format("%d secs", duration.toSecondsPart());
        if (duration.getSeconds() > 60L) {
            message = String.format("%d mins, ", duration.toMinutesPart()) + (String)message;
        }
        if (duration.getSeconds() > 3600L) {
            message = String.format("%d hours, ", duration.getSeconds() / 3600L) + (String)message;
        }
        return message;
    }

    private Transaction transaction() {
        if (this.transaction == null) {
            this.transaction = this.txManager.create();
            this.transaction.setShortLived(true);
        }
        return this.transaction;
    }
}

