/*
 * Decompiled with CFR 0.152.
 */
package org.irenical.dumpy.impl.stream;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.irenical.dumpy.DumpyThreadFactory;
import org.irenical.dumpy.api.IExtractor;
import org.irenical.dumpy.api.IJob;
import org.irenical.dumpy.api.ILoader;
import org.irenical.dumpy.api.IStream;
import org.irenical.dumpy.api.IStreamProcessor;
import org.irenical.dumpy.impl.ExecutorTerminator;
import org.irenical.dumpy.impl.LoaderResponseHandler;
import org.irenical.dumpy.impl.db.DumpyDB;
import org.irenical.dumpy.impl.model.DumpyBlockingQueue;
import org.irenical.jindy.ConfigFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamProcessor
implements IStreamProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamProcessor.class);
    private final ExecutorService loaderResponseExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)new DumpyBlockingQueue(1000), new DumpyThreadFactory());
    private final DumpyDB dumpyDB;
    private boolean isRunning = false;

    public StreamProcessor(DumpyDB dumpyDB) {
        this.dumpyDB = dumpyDB;
    }

    public <ERROR extends Exception> void start() throws ERROR {
        this.isRunning = true;
    }

    public void stop() throws Exception {
        this.isRunning = false;
        ExecutorTerminator.terminate(10L, Long.MAX_VALUE, this.loaderResponseExecutor);
    }

    public <ERROR extends Exception> boolean isRunning() throws ERROR {
        return this.isRunning && this.dumpyDB.isRunning() && !this.loaderResponseExecutor.isTerminated();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <TYPE, ERROR extends Exception> void process(IJob iJob, IStream<TYPE, ERROR> iStream) throws Exception {
        LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] stream start");
        IExtractor<TYPE, ERROR> iExtractor = iStream.getExtractor();
        ILoader iLoader = iStream.getLoader();
        int nThreads = ConfigFactory.getConfig().getInt("dumpy.latest.thread.count", 5);
        int maxQueue = ConfigFactory.getConfig().getInt("dumpy.latest.thread.queue.capacity", 1000);
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)new DumpyBlockingQueue(maxQueue), new DumpyThreadFactory());
        try {
            String cursor = this.dumpyDB.getCursor(iJob.getCode(), iStream.getCode());
            boolean hasNext = true;
            while (this.isRunning() && hasNext) {
                List entities;
                IExtractor.Response<TYPE> extractorResponse;
                try {
                    extractorResponse = iExtractor.get(cursor);
                    entities = extractorResponse.getValues();
                }
                catch (Exception e) {
                    LOGGER.error(e.getLocalizedMessage(), (Throwable)e);
                    continue;
                }
                if (!this.isRunning()) continue;
                if (entities != null && !entities.isEmpty()) {
                    Future loaderTask = executorService.submit(() -> iLoader.load(entities));
                    this.loaderResponseExecutor.execute(new LoaderResponseHandler<TYPE, ERROR>(this.dumpyDB, iJob, iStream, loaderTask));
                }
                cursor = extractorResponse.getCursor();
                hasNext = extractorResponse.hasNext();
                if (extractorResponse.getCursor() != null) {
                    this.dumpyDB.setCursor(iJob.getCode(), iStream.getCode(), cursor);
                }
                String sleepValue = ConfigFactory.getConfig().getString("dumpy.latest.sleep", "1000");
                LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] sleeping " + sleepValue + "ms");
                Thread.sleep(Long.valueOf(sleepValue));
            }
        }
        finally {
            ExecutorTerminator.terminate(10L, Long.MAX_VALUE, executorService);
            LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] stream done");
        }
    }
}

