/*
 * Decompiled with CFR 0.152.
 */
package org.n52.youngs.control.impl;

import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.xml.xpath.XPathExpressionException;
import org.n52.youngs.api.Report;
import org.n52.youngs.control.Runner;
import org.n52.youngs.exception.MappingError;
import org.n52.youngs.exception.SinkError;
import org.n52.youngs.harvest.NodeSourceRecord;
import org.n52.youngs.harvest.Source;
import org.n52.youngs.harvest.SourceException;
import org.n52.youngs.harvest.SourceRecord;
import org.n52.youngs.impl.ReportImpl;
import org.n52.youngs.load.Sink;
import org.n52.youngs.load.SinkRecord;
import org.n52.youngs.postprocess.PostProcessor;
import org.n52.youngs.transform.Mapper;
import org.n52.youngs.transform.MappingEntry;
import org.n52.youngs.validation.XmlSchemaValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Node;
import org.xml.sax.SAXException;

public class SingleThreadBulkRunner
implements Runner {
    private static final Logger log = LoggerFactory.getLogger(SingleThreadBulkRunner.class);
    private static final long DEFAULT_BULK_SIZE = 10L;
    private long bulkSize = 10L;
    private Source source;
    private Mapper mapper;
    private long recordsLimit = Long.MAX_VALUE;
    private Optional<Double> completedPercentage = Optional.empty();
    private Sink sink;
    private final boolean testRun = false;
    private long startPosition = 1L;
    private PostProcessor postProcessor;
    private boolean validateXml;
    private List<XmlSchemaValidator> validators;

    public SingleThreadBulkRunner setBulkSize(long bulkSize) {
        this.bulkSize = bulkSize;
        return this;
    }

    public SingleThreadBulkRunner setStartPosition(long startPosition) {
        this.startPosition = startPosition;
        return this;
    }

    public SingleThreadBulkRunner setRecordsLimit(long recordsLimit) {
        this.recordsLimit = recordsLimit;
        return this;
    }

    @Override
    public SingleThreadBulkRunner harvest(Source source) {
        this.source = source;
        log.debug("Saved source, waiting for load() to be called...", (Object)source);
        return this;
    }

    @Override
    public SingleThreadBulkRunner transform(Mapper mapper) {
        this.mapper = mapper;
        log.debug("Saved mapper, waiting for load() to be called...", (Object)this.source);
        return this;
    }

    @Override
    public Runner postTransformProcess(PostProcessor postProcessor) {
        this.postProcessor = postProcessor;
        log.debug("Saved postProcessor, waiting for load() to be called...", (Object)this.source);
        return this;
    }

    @Override
    public Runner withValidators(List<XmlSchemaValidator> vals) {
        this.validators = vals;
        this.validateXml = true;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Report load(Sink sink) {
        this.sink = sink;
        Objects.nonNull(this.source);
        Objects.nonNull(this.mapper);
        Objects.nonNull(this.sink);
        log.info("Starting harvest from {} to {} with {}", new Object[]{this.source, this.sink, this.mapper});
        ReportImpl report = new ReportImpl();
        try {
            boolean prepareSink = sink.prepare(this.mapper.getMapper());
            if (!prepareSink) {
                String msg = "The sink could not be prepared. Stopping load, please check the logs.";
                log.error(msg);
                report.addMessage(msg, Report.Level.ERROR);
                return report;
            }
        }
        catch (SinkError e) {
            log.error("Problem preparing sink", (Throwable)e);
            report.addMessage(String.format("Problem preparing sink: %s", e.getMessage()), Report.Level.ERROR);
            return report;
        }
        Stopwatch timer = Stopwatch.createStarted();
        long pageStart = this.startPosition;
        long count = this.source.getRecordCount();
        long limit = this.recordsLimit == Long.MAX_VALUE ? count : Math.min(this.recordsLimit + this.startPosition, count);
        Stopwatch sourceTimer = Stopwatch.createUnstarted();
        Stopwatch mappingTimer = Stopwatch.createUnstarted();
        Stopwatch sinkTimer = Stopwatch.createUnstarted();
        Stopwatch currentBulkTimer = Stopwatch.createUnstarted();
        double bulkTimeAvg = 0.0;
        long runNumber = 0L;
        while (pageStart <= limit) {
            String msg;
            currentBulkTimer.start();
            long recordsLeft = limit - pageStart + 1L;
            long size = Math.min(recordsLeft, this.bulkSize);
            if (size <= 0L) break;
            log.info("### [{}] Requesting {} records from {} starting at {}, last requested record will be {} ###", new Object[]{runNumber, size, this.source.getEndpoint(), pageStart, limit});
            try {
                sourceTimer.start();
                Collection<SourceRecord> records = this.source.getRecords(pageStart, size, report);
                sourceTimer.stop();
                ArrayList validRecords = Lists.newArrayList();
                if (this.validateXml) {
                    int index = 0;
                    for (SourceRecord record2 : records) {
                        try {
                            List<String> messages = this.validate(record2);
                            if (!messages.isEmpty()) {
                                messages.forEach(m -> report.addMessage((String)m, Report.Level.INFO));
                            }
                            log.debug("File #{} is schema valid", (Object)index++);
                            validRecords.add(record2);
                        }
                        catch (SourceException e) {
                            String msg2 = String.format("Issue while processing record %s: %s", index, e.getMessage());
                            log.info(msg2, (Throwable)e);
                            report.addMessage(msg2, Report.Level.ERROR);
                        }
                    }
                }
                log.debug("Mapping {} retrieved valid records.", (Object)validRecords.size());
                mappingTimer.start();
                List<SinkRecord> mappedRecords = validRecords.stream().map(record -> {
                    try {
                        SinkRecord r = this.mapper.map((SourceRecord)record);
                        if (this.postProcessor != null) {
                            return this.postProcessor.process(r);
                        }
                        return r;
                    }
                    catch (MappingError e) {
                        report.addFailedRecord(record.toString(), "Problem during mapping: " + e.getMessage());
                        return null;
                    }
                }).filter(Objects::nonNull).collect(Collectors.toList());
                mappingTimer.stop();
                log.debug("Storing {} mapped records.", (Object)mappedRecords.size());
                sinkTimer.start();
                mappedRecords.forEach(record -> {
                    try {
                        boolean result = sink.store((SinkRecord)record);
                        if (result) {
                            report.addSuccessfulRecord(record.getId());
                        } else {
                            report.addFailedRecord(record.getId(), "see sink log");
                        }
                    }
                    catch (SinkError e) {
                        log.warn("Problem during mapping: ", (Throwable)e);
                        report.addFailedRecord(record.toString(), "Problem during mapping: " + e.getMessage());
                    }
                });
                sinkTimer.stop();
            }
            catch (SourceException e) {
                msg = String.format("Issue while processing records %s to %s: %s", pageStart, size, e.getMessage());
                log.info(msg, (Throwable)e);
                report.addMessage(msg, Report.Level.ERROR);
            }
            catch (RuntimeException e) {
                msg = String.format("Unexpected error while processing records %s to %s: %s", pageStart, size, e.getMessage());
                log.error(msg, (Throwable)e);
                report.addMessage(msg, Report.Level.ERROR);
            }
            finally {
                if (sourceTimer.isRunning()) {
                    sourceTimer.stop();
                }
                if (mappingTimer.isRunning()) {
                    mappingTimer.stop();
                }
                if (sinkTimer.isRunning()) {
                    sinkTimer.stop();
                }
            }
            pageStart += this.bulkSize;
            currentBulkTimer.stop();
            bulkTimeAvg = (bulkTimeAvg * (double)runNumber + (double)currentBulkTimer.elapsed(TimeUnit.SECONDS)) / (double)(runNumber + 1L);
            this.updateAndLog(runNumber, (runNumber + 1L) * this.bulkSize, currentBulkTimer.elapsed(TimeUnit.SECONDS), bulkTimeAvg);
            currentBulkTimer.reset();
            ++runNumber;
        }
        timer.stop();
        log.info("Completed harvesting for {} ({} failed) of {} records in {} minutes", new Object[]{report.getNumberOfRecordsAdded(), report.getNumberOfRecordsFailed(), this.source.getRecordCount(), timer.elapsed(TimeUnit.MINUTES)});
        log.info("Time spent (minutes): source={}, mapping={}, sink={}", new Object[]{sourceTimer.elapsed(TimeUnit.MINUTES), mappingTimer.elapsed(TimeUnit.MINUTES), sinkTimer.elapsed(TimeUnit.MINUTES)});
        return report;
    }

    @Override
    public double getCompletedPercentage() {
        return this.completedPercentage.orElse(Double.NEGATIVE_INFINITY);
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("source", (Object)this.source).add("mapper", (Object)this.mapper).add("sink", (Object)this.sink).toString();
    }

    private void updateAndLog(long run, long pageStart, long bulkSeconds, double bulkAverageSeconds) {
        double percentageTask = (double)pageStart / (double)this.recordsLimit * 100.0;
        this.completedPercentage = Optional.of(percentageTask);
        log.info("### [{}] Completed {}% of task in {} seconds (avg: {} seconds) ###", new Object[]{run, String.format("%1$,.2f", this.getCompletedPercentage()), bulkSeconds, String.format("%1$,.2f", bulkAverageSeconds)});
    }

    private List<String> validate(SourceRecord sourceRecord) throws SourceException {
        if (sourceRecord instanceof NodeSourceRecord) {
            NodeSourceRecord nsr = (NodeSourceRecord)sourceRecord;
            try {
                XmlSchemaValidator val = this.resolveValidator(nsr.getRecord());
                if (val != null) {
                    return val.validate(nsr.getRecord());
                }
                return Collections.singletonList("No schema validator available for namespace: " + nsr.getRecord().getNamespaceURI());
            }
            catch (IOException | SAXException ex) {
                String recordId = this.tryRecordIdExtraction((NodeSourceRecord)sourceRecord);
                throw new SourceException("Validation failed for record with ID: " + recordId, ex);
            }
        }
        log.warn("The SourceRecord class {} is not supported", (Object)sourceRecord.getClass().getName());
        return Collections.emptyList();
    }

    private XmlSchemaValidator resolveValidator(Node record) {
        if (this.validators != null && !this.validators.isEmpty()) {
            for (XmlSchemaValidator validator : this.validators) {
                if (!validator.matchesNamespace(record.getNamespaceURI())) continue;
                return validator;
            }
        }
        return null;
    }

    private String tryRecordIdExtraction(NodeSourceRecord nodeSourceRecord) {
        String idField = this.mapper.getMapper().getIdentifierField();
        MappingEntry idMapping = this.mapper.getMapper().getEntry(idField);
        try {
            String result = idMapping.getXPath().evaluate(nodeSourceRecord.getRecord());
            return String.format("[identifier] %s", result);
        }
        catch (XPathExpressionException ex) {
            log.debug("Could not extract ");
            return String.format("[protocolIdentifier] %s", nodeSourceRecord.getProtocolIdentifier());
        }
    }
}

