/*
 * Decompiled with CFR 0.152.
 */
package org.irenical.dumpy.impl;

import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.irenical.dumpy.DumpyThreadFactory;
import org.irenical.dumpy.api.IExtractor;
import org.irenical.dumpy.api.IJob;
import org.irenical.dumpy.api.ILoader;
import org.irenical.dumpy.api.IStream;
import org.irenical.dumpy.api.IStreamProcessor;
import org.irenical.dumpy.impl.LoaderResponseHandler;
import org.irenical.dumpy.impl.db.DumpyDB;
import org.irenical.dumpy.impl.model.PaginatedResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ErrorStreamProcessor
implements IStreamProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(ErrorStreamProcessor.class);
    private final DumpyDB dumpyDB;
    private boolean isRunning = false;
    private final ExecutorService loaderResponseExecutor = Executors.newCachedThreadPool(new DumpyThreadFactory());

    public ErrorStreamProcessor(DumpyDB dumpyDB) {
        this.dumpyDB = dumpyDB;
    }

    @Override
    public <ERROR extends Exception> void start() throws ERROR {
        this.isRunning = true;
    }

    @Override
    public <ERROR extends Exception> void stop() throws ERROR {
        this.isRunning = false;
        this.loaderResponseExecutor.shutdown();
        try {
            this.loaderResponseExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        }
        catch (InterruptedException e) {
            LOGGER.error(e.getLocalizedMessage(), e);
        }
    }

    @Override
    public <ERROR extends Exception> boolean isRunning() throws ERROR {
        return this.isRunning && this.dumpyDB.isRunning() && !this.loaderResponseExecutor.isTerminated();
    }

    @Override
    public <TYPE, ERROR extends Exception> void process(IJob iJob, IStream<TYPE, ERROR> iStream) throws Exception {
        LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] stream start");
        IExtractor<TYPE, ERROR> iExtractor = iStream.getExtractor();
        ILoader iLoader = iStream.getLoader();
        ExecutorService executorService = Executors.newCachedThreadPool(new DumpyThreadFactory());
        String cursor = null;
        boolean hasNext = true;
        while (this.isRunning && hasNext) {
            PaginatedResponse<String> response = this.dumpyDB.get(iJob.getCode(), iStream.getCode(), cursor);
            if (response.getValues() != null && !response.getValues().isEmpty()) {
                String extractorCursor = null;
                boolean extractorHasNext = true;
                while (extractorHasNext) {
                    IExtractor.Response typeResponse = iExtractor.get(response.values, extractorCursor);
                    if (typeResponse.getValues() != null && !typeResponse.getValues().isEmpty()) {
                        Future<ILoader.Status> loaderTask = executorService.submit(() -> iLoader.load(typeResponse.getValues()));
                        this.loaderResponseExecutor.execute(new LoaderResponseHandler<TYPE, ERROR>(this.dumpyDB, iJob, iStream, loaderTask, new LinkedList(typeResponse.getValues())));
                    }
                    extractorCursor = typeResponse.getCursor();
                    extractorHasNext = typeResponse.hasNext();
                }
            }
            cursor = response.cursor;
            hasNext = response.hasNext;
        }
        executorService.shutdown();
        try {
            boolean awaitTermination = false;
            while (!awaitTermination) {
                awaitTermination = executorService.awaitTermination(10L, TimeUnit.SECONDS);
            }
        }
        catch (InterruptedException e) {
            LOGGER.error(e.getLocalizedMessage(), e);
            executorService.shutdownNow();
            executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        }
        LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] stream done");
    }
}

