/*
 * Decompiled with CFR 0.152.
 */
package technology.dice.dicewhere.lineprocessing;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import technology.dice.dicewhere.api.exceptions.LineParsingException;
import technology.dice.dicewhere.lineprocessing.LineprocessorListenerForProvider;
import technology.dice.dicewhere.lineprocessing.SerializedLine;
import technology.dice.dicewhere.parsing.LineParser;
import technology.dice.dicewhere.parsing.ParsedLine;
import technology.dice.dicewhere.reading.RawLine;

public class LineProcessor
implements Runnable {
    private static final int WORKER_BATCH_SIZE = 10000;
    private final ExecutorService executorService;
    private final ArrayBlockingQueue<RawLine> lines;
    private final LineParser parser;
    private final boolean retainOriginalLine;
    private final BlockingQueue<SerializedLine> destination;
    private final LineprocessorListenerForProvider progressListener;
    private final AtomicBoolean expectingMore = new AtomicBoolean(true);
    private final int workersCount;

    public LineProcessor(ExecutorService executorService, BlockingQueue<SerializedLine> destination, LineParser parser, boolean retainOriginalLine, LineprocessorListenerForProvider progressListener, int workersCount) {
        this.lines = new ArrayBlockingQueue((workersCount + 1) * 10000);
        this.destination = destination;
        this.executorService = executorService;
        this.parser = parser;
        this.retainOriginalLine = retainOriginalLine;
        this.progressListener = progressListener;
        this.workersCount = workersCount;
    }

    public void markDataComplete() {
        this.expectingMore.set(false);
    }

    public void addLine(RawLine rawLine) {
        try {
            this.lines.put(rawLine);
        }
        catch (InterruptedException e) {
            this.progressListener.enqueueError(rawLine, e);
        }
    }

    @Override
    public void run() {
        long started = System.currentTimeMillis();
        AtomicLong totalLines = new AtomicLong();
        CompletableFuture[] workerList = new CompletableFuture[this.workersCount];
        while (this.expectingMore.get() || this.lines.size() > 0) {
            try {
                for (int i = 0; i < this.workersCount; ++i) {
                    ArrayList batch = new ArrayList(10000);
                    Queues.drain(this.lines, batch, (int)10000, (long)1L, (TimeUnit)TimeUnit.NANOSECONDS);
                    workerList[i] = CompletableFuture.supplyAsync(() -> this.buildSerializedLineBatch(started, batch), this.executorService);
                }
                CompletableFuture.allOf(workerList);
                for (CompletableFuture worker : workerList) {
                    ((List)worker.join()).forEach(serializedLine -> {
                        try {
                            this.destination.put((SerializedLine)serializedLine);
                            totalLines.getAndIncrement();
                            this.progressListener.lineProcessed((SerializedLine)serializedLine, System.currentTimeMillis() - started);
                        }
                        catch (InterruptedException e) {
                            this.progressListener.dequeueError((SerializedLine)serializedLine, e);
                        }
                    });
                }
            }
            catch (InterruptedException e) {
                this.progressListener.processorInterrupted(e);
                throw new RuntimeException("Line processor interrupted", e);
            }
        }
        this.progressListener.finished(totalLines.get(), System.currentTimeMillis() - started);
    }

    private ImmutableList<SerializedLine> buildSerializedLineBatch(long started, Collection<RawLine> batch) {
        return (ImmutableList)batch.stream().flatMap(rawLine -> this.attemptParse((RawLine)rawLine, started)).collect(ImmutableList.toImmutableList());
    }

    private Stream<SerializedLine> attemptParse(RawLine rawLine, long started) {
        try {
            Stream<ParsedLine> parsed = this.parser.parse(rawLine, this.retainOriginalLine);
            long now = System.currentTimeMillis();
            return parsed.flatMap(l -> {
                this.progressListener.lineParsed((ParsedLine)l, now - started);
                return this.attemptSerialize((ParsedLine)l);
            });
        }
        catch (LineParsingException e) {
            this.progressListener.parseError(rawLine, e);
            return Stream.empty();
        }
        catch (Exception e) {
            this.progressListener.parseError(rawLine, new LineParsingException(e, rawLine));
            return Stream.empty();
        }
    }

    private Stream<SerializedLine> attemptSerialize(ParsedLine parsedLine) {
        try {
            return Stream.of(new SerializedLine(parsedLine.getStartIp(), parsedLine));
        }
        catch (Exception e) {
            this.progressListener.serializeError(parsedLine, e);
            return Stream.empty();
        }
    }
}

