/*
 * Decompiled with CFR 0.152.
 */
package org.irenical.dumpy.impl.stream;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.irenical.dumpy.DumpyThreadFactory;
import org.irenical.dumpy.api.IExtractor;
import org.irenical.dumpy.api.IJob;
import org.irenical.dumpy.api.ILoader;
import org.irenical.dumpy.api.IStream;
import org.irenical.dumpy.api.IStreamProcessor;
import org.irenical.dumpy.impl.ExecutorTerminator;
import org.irenical.dumpy.impl.LoaderResponseHandler;
import org.irenical.dumpy.impl.db.DumpyDB;
import org.irenical.dumpy.impl.model.DumpyBlockingQueue;
import org.irenical.dumpy.impl.model.PaginatedResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ErrorStreamProcessor
implements IStreamProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(ErrorStreamProcessor.class);
    private final DumpyDB dumpyDB;
    private boolean isRunning = false;
    private final ExecutorService loaderResponseExecutor = Executors.newCachedThreadPool(new DumpyThreadFactory());

    public ErrorStreamProcessor(DumpyDB dumpyDB) {
        this.dumpyDB = dumpyDB;
    }

    public <ERROR extends Exception> void start() throws ERROR {
        this.isRunning = true;
    }

    public void stop() throws Exception {
        this.isRunning = false;
        ExecutorTerminator.terminate(10L, Long.MAX_VALUE, this.loaderResponseExecutor);
    }

    public <ERROR extends Exception> boolean isRunning() throws ERROR {
        return this.isRunning && this.dumpyDB.isRunning() && !this.loaderResponseExecutor.isTerminated();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <TYPE, ERROR extends Exception> void process(IJob iJob, IStream<TYPE, ERROR> iStream) throws Exception {
        LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] stream start");
        IExtractor<TYPE, ERROR> iExtractor = iStream.getExtractor();
        ILoader iLoader = iStream.getLoader();
        int nThreads = 5;
        int maxQueue = 1000;
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)new DumpyBlockingQueue(1000), new DumpyThreadFactory());
        try {
            String cursor = null;
            boolean hasNext = true;
            while (this.isRunning() && hasNext) {
                PaginatedResponse<String> response = this.dumpyDB.get(iJob.getCode(), iStream.getCode(), cursor);
                List<String> values = response.getValues();
                if (values != null && !values.isEmpty()) {
                    String extractorCursor = null;
                    boolean extractorHasNext = true;
                    while (this.isRunning() && extractorHasNext) {
                        IExtractor.Response<TYPE> typeResponse = iExtractor.get(response.values, extractorCursor);
                        List entities = typeResponse.getValues();
                        if (this.isRunning() && entities != null && !entities.isEmpty()) {
                            Future loaderTask = executorService.submit(() -> iLoader.load(entities));
                            this.loaderResponseExecutor.execute(new LoaderResponseHandler<TYPE, ERROR>(this.dumpyDB, iJob, iStream, loaderTask));
                        }
                        extractorCursor = typeResponse.getCursor();
                        extractorHasNext = typeResponse.hasNext();
                    }
                }
                cursor = response.cursor;
                hasNext = response.hasNext;
                if (values != null && !values.isEmpty()) continue;
                Thread.sleep(1000L);
            }
        }
        finally {
            ExecutorTerminator.terminate(10L, Long.MAX_VALUE, executorService);
            LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] stream done");
        }
    }
}

