/*
 * Decompiled with CFR 0.152.
 */
package org.n52.youngs.control.impl;

import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.n52.youngs.api.Report;
import org.n52.youngs.control.Runner;
import org.n52.youngs.exception.MappingError;
import org.n52.youngs.exception.SinkError;
import org.n52.youngs.harvest.Source;
import org.n52.youngs.harvest.SourceRecord;
import org.n52.youngs.impl.ReportImpl;
import org.n52.youngs.load.Sink;
import org.n52.youngs.load.SinkRecord;
import org.n52.youngs.transform.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleThreadBulkRunner
implements Runner {
    private static final Logger log = LoggerFactory.getLogger(SingleThreadBulkRunner.class);
    private static final long DEFAULT_BULK_SIZE = 10L;
    private long bulkSize = 10L;
    private Source source;
    private Mapper mapper;
    private long recordsLimit = Long.MAX_VALUE;
    private Optional<Double> completedPercentage = Optional.empty();
    private Sink sink;
    private final boolean testRun = false;
    private long startPosition = 1L;

    public SingleThreadBulkRunner setBulkSize(long bulkSize) {
        this.bulkSize = bulkSize;
        return this;
    }

    public SingleThreadBulkRunner setStartPosition(long startPosition) {
        this.startPosition = startPosition;
        return this;
    }

    public SingleThreadBulkRunner setRecordsLimit(long recordsLimit) {
        this.recordsLimit = recordsLimit;
        return this;
    }

    @Override
    public SingleThreadBulkRunner harvest(Source source) {
        this.source = source;
        log.debug("Saved source, waiting for load() to be called...", (Object)source);
        return this;
    }

    @Override
    public SingleThreadBulkRunner transform(Mapper mapper) {
        this.mapper = mapper;
        log.debug("Saved mapper, waiting for load() to be called...", (Object)this.source);
        return this;
    }

    @Override
    public Report load(Sink sink) {
        this.sink = sink;
        Objects.nonNull(this.source);
        Objects.nonNull(this.mapper);
        Objects.nonNull(this.sink);
        log.info("Starting harvest from {} to {} with {}", new Object[]{this.source, this.sink, this.mapper});
        ReportImpl report = new ReportImpl();
        try {
            boolean prepareSink = sink.prepare(this.mapper.getMapper());
            if (!prepareSink) {
                String msg = "The sink could not be prepared. Stopping load, please check the logs.";
                log.error(msg);
                report.addMessage(msg);
                return report;
            }
        }
        catch (SinkError e) {
            log.error("Problem preparing sink", (Throwable)e);
            report.addMessage(String.format("Problem preparing sink: %s", e.getMessage()));
            return report;
        }
        Stopwatch timer = Stopwatch.createStarted();
        long pageStart = this.startPosition;
        long count = this.source.getRecordCount();
        long limit = Math.min(this.recordsLimit + this.startPosition, count);
        Stopwatch sourceTimer = Stopwatch.createUnstarted();
        Stopwatch mappingTimer = Stopwatch.createUnstarted();
        Stopwatch sinkTimer = Stopwatch.createUnstarted();
        Stopwatch currentBulkTimer = Stopwatch.createUnstarted();
        double bulkTimeAvg = 0.0;
        long runNumber = 0L;
        while (pageStart <= limit) {
            currentBulkTimer.start();
            long recordsLeft = limit - pageStart + 1L;
            long size = Math.min(recordsLeft, this.bulkSize);
            if (size <= 0L) break;
            log.info("### [{}] Requesting {} records from {} starting at {}, last requested record will be {} ###", new Object[]{runNumber, size, this.source.getEndpoint(), pageStart, limit});
            try {
                sourceTimer.start();
                Collection<SourceRecord> records = this.source.getRecords(pageStart, size, report);
                sourceTimer.stop();
                log.debug("Mapping {} retrieved records.", (Object)records.size());
                mappingTimer.start();
                List<SinkRecord> mappedRecords = records.stream().map(record -> {
                    try {
                        return this.mapper.map((SourceRecord)record);
                    }
                    catch (MappingError e) {
                        report.addFailedRecord(record.toString(), "Problem during mapping: " + e.getMessage());
                        return null;
                    }
                }).filter(Objects::nonNull).collect(Collectors.toList());
                mappingTimer.stop();
                log.debug("Storing {} mapped records.", (Object)mappedRecords.size());
                sinkTimer.start();
                mappedRecords.forEach(record -> {
                    try {
                        boolean result = sink.store((SinkRecord)record);
                        if (result) {
                            report.addSuccessfulRecord(record.getId());
                        } else {
                            report.addFailedRecord(record.getId(), "see sink log");
                        }
                    }
                    catch (SinkError e) {
                        report.addFailedRecord(record.toString(), "Problem during mapping: " + e.getMessage());
                    }
                });
                sinkTimer.stop();
            }
            catch (RuntimeException e) {
                if (sourceTimer.isRunning()) {
                    sourceTimer.stop();
                }
                if (mappingTimer.isRunning()) {
                    mappingTimer.stop();
                }
                if (sinkTimer.isRunning()) {
                    sinkTimer.stop();
                }
                String msg = String.format("Problem processing records %s to %s: %s", pageStart, pageStart + size, e.getMessage());
                log.error(msg, (Throwable)e);
                report.addMessage(msg);
            }
            pageStart += this.bulkSize;
            currentBulkTimer.stop();
            bulkTimeAvg = (bulkTimeAvg * (double)runNumber + (double)currentBulkTimer.elapsed(TimeUnit.SECONDS)) / (double)(runNumber + 1L);
            this.updateAndLog(runNumber, (runNumber + 1L) * this.bulkSize, currentBulkTimer.elapsed(TimeUnit.SECONDS), bulkTimeAvg);
            currentBulkTimer.reset();
            ++runNumber;
        }
        timer.stop();
        log.info("Completed harvesting for {} ({} failed) of {} records in {} minutes", new Object[]{report.getNumberOfRecordsAdded(), report.getNumberOfRecordsFailed(), this.source.getRecordCount(), timer.elapsed(TimeUnit.MINUTES)});
        log.info("Time spent (minutes): source={}, mapping={}, sink={}", new Object[]{sourceTimer.elapsed(TimeUnit.MINUTES), mappingTimer.elapsed(TimeUnit.MINUTES), sinkTimer.elapsed(TimeUnit.MINUTES)});
        return report;
    }

    @Override
    public double getCompletedPercentage() {
        return this.completedPercentage.orElse(Double.NEGATIVE_INFINITY);
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("source", (Object)this.source).add("mapper", (Object)this.mapper).add("sink", (Object)this.sink).toString();
    }

    private void updateAndLog(long run, long pageStart, long bulkSeconds, double bulkAverageSeconds) {
        double percentageTask = (double)pageStart / (double)this.recordsLimit * 100.0;
        this.completedPercentage = Optional.of(percentageTask);
        log.info("### [{}] Completed {}% of task in {} seconds (avg: {} seconds) ###", new Object[]{run, String.format("%1$,.2f", this.getCompletedPercentage()), bulkSeconds, String.format("%1$,.2f", bulkAverageSeconds)});
    }
}

