/*
 * Decompiled with CFR 0.152.
 */
package org.irenical.dumpy.impl;

import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.irenical.dumpy.DumpyThreadFactory;
import org.irenical.dumpy.api.IExtractor;
import org.irenical.dumpy.api.IJob;
import org.irenical.dumpy.api.ILoader;
import org.irenical.dumpy.api.IStream;
import org.irenical.dumpy.api.IStreamProcessor;
import org.irenical.dumpy.impl.LoaderResponseHandler;
import org.irenical.dumpy.impl.db.DumpyDB;
import org.irenical.dumpy.impl.model.DumpyBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LatestStreamProcessor
implements IStreamProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(LatestStreamProcessor.class);
    private final ExecutorService loaderResponseExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)new DumpyBlockingQueue(1000), new DumpyThreadFactory());
    private final DumpyDB dumpyDB;
    private boolean isRunning = false;

    public LatestStreamProcessor(DumpyDB dumpyDB) {
        this.dumpyDB = dumpyDB;
    }

    @Override
    public <ERROR extends Exception> void start() throws ERROR {
        this.isRunning = true;
    }

    @Override
    public void stop() throws Exception {
        this.isRunning = false;
        this.loaderResponseExecutor.shutdown();
        try {
            boolean awaitTermination = false;
            while (!awaitTermination) {
                LOGGER.debug("[ processor( onStop() ) ] waiting for loaders to finish");
                awaitTermination = this.loaderResponseExecutor.awaitTermination(10L, TimeUnit.SECONDS);
            }
        }
        catch (InterruptedException e) {
            LOGGER.error(e.getLocalizedMessage(), e);
            this.loaderResponseExecutor.shutdownNow();
            this.loaderResponseExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        }
    }

    @Override
    public <ERROR extends Exception> boolean isRunning() throws ERROR {
        return this.isRunning && this.dumpyDB.isRunning() && !this.loaderResponseExecutor.isTerminated();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <TYPE, ERROR extends Exception> void process(IJob iJob, IStream<TYPE, ERROR> iStream) throws Exception {
        LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] stream start");
        IExtractor<TYPE, ERROR> iExtractor = iStream.getExtractor();
        ILoader iLoader = iStream.getLoader();
        int nThreads = 5;
        int maxQueue = 1000;
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)new DumpyBlockingQueue(1000), new DumpyThreadFactory());
        try {
            String cursor = this.dumpyDB.getCursor(iJob.getCode(), iStream.getCode());
            boolean hasNext = true;
            while (this.isRunning && hasNext) {
                IExtractor.Response extractorResponse = iExtractor.get(cursor);
                if (extractorResponse.getValues() != null && !extractorResponse.getValues().isEmpty()) {
                    Future<ILoader.Status> loaderTask = executorService.submit(() -> iLoader.load(extractorResponse.getValues()));
                    this.loaderResponseExecutor.execute(new LoaderResponseHandler<TYPE, ERROR>(this.dumpyDB, iJob, iStream, loaderTask, new LinkedList(extractorResponse.getValues())));
                }
                if (extractorResponse.getCursor() == null) continue;
                cursor = extractorResponse.getCursor();
                hasNext = extractorResponse.hasNext();
                this.dumpyDB.setCursor(iJob.getCode(), iStream.getCode(), cursor);
            }
        }
        finally {
            executorService.shutdown();
            try {
                boolean awaitTermination = false;
                while (!awaitTermination) {
                    LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] waiting for loaders to finish");
                    awaitTermination = executorService.awaitTermination(10L, TimeUnit.SECONDS);
                }
            }
            catch (InterruptedException e) {
                LOGGER.error(e.getLocalizedMessage(), e);
                executorService.shutdownNow();
                executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
            }
            LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] stream done");
        }
    }
}

