/*
 * Decompiled with CFR 0.152.
 */
package org.kiwiproject.beta.base.process;

import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.Closeable;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.Generated;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.kiwiproject.base.DefaultEnvironment;
import org.kiwiproject.base.KiwiEnvironment;
import org.kiwiproject.beta.base.process.ProcessOutputHandlerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Beta
public class ProcessOutputHandler
implements Closeable {
    @Generated
    private static final Logger LOG = LoggerFactory.getLogger(ProcessOutputHandler.class);
    private static final int CALLBACK_THREAD_POOL_SIZE = 2;
    private final ListeningExecutorService listeningExecutorService;
    private final ExecutorService executorService;
    private final ExecutorService callbackExecutorService;
    private final KiwiEnvironment environment;
    private final int bufferCapacityBytes;
    private final long sleepTimeMillis;

    public ProcessOutputHandler(ProcessOutputHandlerConfig config) {
        this(config.getThreadPoolSize(), config.bufferCapacityInBytes(), config.sleepTimeInMillis());
    }

    public ProcessOutputHandler(int threadPoolSize, int bufferCapacityBytes, long sleepTimeMillis) {
        this(Executors.newFixedThreadPool(threadPoolSize), Executors.newFixedThreadPool(2), (KiwiEnvironment)new DefaultEnvironment(), bufferCapacityBytes, sleepTimeMillis);
    }

    @VisibleForTesting
    ProcessOutputHandler(ExecutorService executorService, ExecutorService callbackExecutorService, KiwiEnvironment environment, int bufferCapacityBytes, long sleepTimeMillis) {
        this.executorService = executorService;
        this.listeningExecutorService = MoreExecutors.listeningDecorator((ExecutorService)executorService);
        this.callbackExecutorService = callbackExecutorService;
        this.environment = environment;
        this.bufferCapacityBytes = bufferCapacityBytes;
        this.sleepTimeMillis = sleepTimeMillis;
    }

    public Result handleStandardOutput(Process process, Consumer<String> outputConsumer) {
        return this.handle(process, ProcessOutputType.STANDARD, outputConsumer);
    }

    public Result handleErrorOutput(Process process, Consumer<String> errorConsumer) {
        return this.handle(process, ProcessOutputType.ERROR, errorConsumer);
    }

    public Result handle(Process process, ProcessOutputType outputType, Consumer<String> outputConsumer) {
        String pid = ProcessOutputHandler.pidOf(process).orElse("[unknown]");
        String outputTypeDesc = outputType.description;
        if (!process.isAlive()) {
            LOG.warn("Process {} is dead-on-arrival, no {} output to read!", (Object)pid, (Object)outputTypeDesc);
            return Result.IGNORE_DEAD_PROCESS;
        }
        LOG.debug("Submit task to read {} output of process {}", (Object)outputTypeDesc, (Object)pid);
        Callable<Object> task = this.createTask(process, outputType, outputConsumer, pid, outputTypeDesc);
        ListenableFuture listenableFuture = this.listeningExecutorService.submit(task);
        this.addCompletionLoggingCallback(pid, listenableFuture);
        return Result.HANDLING;
    }

    private static Optional<String> pidOf(Process process) {
        try {
            return Optional.of(String.valueOf(process.pid()));
        }
        catch (UnsupportedOperationException e) {
            return Optional.empty();
        }
    }

    private Callable<Object> createTask(Process process, ProcessOutputType streamType, Consumer<String> outputConsumer, String pid, String outputTypeDesc) {
        return () -> {
            try (InputStream inputStream = ProcessOutputHandler.selectInputStream(process, streamType);
                 ReadableByteChannel channel = Channels.newChannel(inputStream);){
                ByteBuffer buffer = ByteBuffer.allocate(this.bufferCapacityBytes);
                while (process.isAlive()) {
                    LOG.trace("Reading up to {} bytes from {} output from process {}", new Object[]{this.bufferCapacityBytes, outputTypeDesc, pid});
                    int bytesRead = channel.read(buffer);
                    LOG.trace("Read {} byte(s) from {} output from process {}", new Object[]{bytesRead, outputTypeDesc, pid});
                    buffer.flip();
                    String outputData = ProcessOutputHandler.readStringAsUTF8(buffer);
                    outputConsumer.accept(outputData);
                    buffer.compact();
                    this.environment.sleepQuietly(this.sleepTimeMillis, TimeUnit.MILLISECONDS);
                }
                LOG.debug("Process {} is dead. No more {} output to read", (Object)pid, (Object)outputTypeDesc);
            }
            return null;
        };
    }

    private static InputStream selectInputStream(Process process, ProcessOutputType streamType) {
        return streamType == ProcessOutputType.STANDARD ? process.getInputStream() : process.getErrorStream();
    }

    private static String readStringAsUTF8(ByteBuffer byteBuffer) {
        byte[] bytes = new byte[byteBuffer.remaining()];
        byteBuffer.get(bytes);
        return new String(bytes, StandardCharsets.UTF_8);
    }

    private void addCompletionLoggingCallback(final String processId, ListenableFuture<?> listenableFuture) {
        FutureCallback<Object> callback = new FutureCallback<Object>(){

            public void onSuccess(@Nullable Object result) {
                LOG.info("Handler for process {} completed without error", (Object)processId);
            }

            public void onFailure(@NonNull Throwable error) {
                LOG.error("Handler for process {} had an error", (Object)processId, (Object)error);
            }
        };
        Futures.addCallback(listenableFuture, (FutureCallback)callback, (Executor)this.callbackExecutorService);
    }

    @Override
    public void close() {
        LOG.info("Shutdown executors NOW");
        this.executorService.shutdownNow();
        this.callbackExecutorService.shutdownNow();
    }

    public static enum Result {
        HANDLING,
        IGNORE_DEAD_PROCESS;

    }

    public static enum ProcessOutputType {
        STANDARD("standard"),
        ERROR("error");

        private final String description;

        private ProcessOutputType(String description) {
            this.description = description;
        }
    }
}

