/*
 * Decompiled with CFR 0.152.
 */
package org.hansken.plugin.extraction.runtime.grpc.server;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.hansken.ep.shade.com.google.protobuf.Any;
import org.hansken.ep.shade.com.google.protobuf.Empty;
import org.hansken.ep.shade.io.grpc.Status;
import org.hansken.ep.shade.io.grpc.stub.StreamObserver;
import org.hansken.extraction.plugin.grpc.ExtractionPluginServiceGrpc;
import org.hansken.extraction.plugin.grpc.RpcDataContext;
import org.hansken.extraction.plugin.grpc.RpcPluginInfo;
import org.hansken.extraction.plugin.grpc.RpcStart;
import org.hansken.extraction.plugin.grpc.RpcTrace;
import org.hansken.plugin.extraction.api.BaseExtractionPlugin;
import org.hansken.plugin.extraction.api.DataContext;
import org.hansken.plugin.extraction.api.DeferredExtractionPlugin;
import org.hansken.plugin.extraction.api.ExtractionPlugin;
import org.hansken.plugin.extraction.api.PluginInfo;
import org.hansken.plugin.extraction.api.Trace;
import org.hansken.plugin.extraction.api.TraceSearcher;
import org.hansken.plugin.extraction.runtime.grpc.common.Checks;
import org.hansken.plugin.extraction.runtime.grpc.common.Pack;
import org.hansken.plugin.extraction.runtime.grpc.common.Unpack;
import org.hansken.plugin.extraction.runtime.grpc.server.proxy.ExtractionContextProxy;
import org.hansken.plugin.extraction.runtime.grpc.server.proxy.GrpcFacade;
import org.hansken.plugin.extraction.runtime.grpc.server.proxy.TraceProxy;
import org.hansken.plugin.extraction.runtime.grpc.server.proxy.TraceSearcherProxy;
import org.hansken.plugin.extraction.util.ArgChecks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExtractionPluginServerService
extends ExtractionPluginServiceGrpc.ExtractionPluginServiceImplBase {
    private static final Logger LOG = LoggerFactory.getLogger(ExtractionPluginServerService.class);
    private final ExecutorService _workers = Executors.newFixedThreadPool(8);
    private final Supplier<BaseExtractionPlugin> _plugin;
    private final int _maximumMessageSize;

    protected ExtractionPluginServerService(Supplier<BaseExtractionPlugin> plugin, int maximumMessageSize) {
        this._plugin = (Supplier)ArgChecks.argNotNull((String)"plugin", plugin);
        this._maximumMessageSize = ArgChecks.argNotNegative((String)"maximumMessageSize", (int)maximumMessageSize);
    }

    @Override
    public void pluginInfo(Empty request, StreamObserver<RpcPluginInfo> responseObserver) {
        responseObserver.onNext(Pack.pluginInfo((PluginInfo)this._plugin.get().pluginInfo()));
        responseObserver.onCompleted();
    }

    @Override
    public StreamObserver<Any> process(StreamObserver<Any> outgoingMessages) {
        ArrayBlockingQueue<Any> incomingMessages = new ArrayBlockingQueue<Any>(1);
        final GrpcFacade facade = new GrpcFacade(incomingMessages, outgoingMessages, this._maximumMessageSize);
        return new StreamObserver<Any>(){
            private final AtomicBoolean _started = new AtomicBoolean();
            private final AtomicReference<Thread> _myThreadReference = new AtomicReference();

            @Override
            public void onNext(Any any) {
                try {
                    if (any.is(RpcStart.class)) {
                        if (this._started.getAndSet(true)) {
                            facade.onError(Status.Code.FAILED_PRECONDITION, new IllegalStateException("processing of the trace has already been started"));
                            return;
                        }
                        ExtractionPluginServerService.this._workers.execute(() -> {
                            this._myThreadReference.set(Thread.currentThread());
                            RpcStart start = Unpack.start((Any)any);
                            this.process(start.getTrace(), start.getDataContext(), facade);
                        });
                        return;
                    }
                    facade.handleResponse(any);
                }
                catch (Throwable t) {
                    facade.onError(Status.Code.CANCELLED, t);
                }
            }

            private void process(RpcTrace rpcTrace, RpcDataContext rpcDataContext, GrpcFacade facade2) {
                long start = System.nanoTime();
                String id = null;
                try {
                    BaseExtractionPlugin plugin = ExtractionPluginServerService.this._plugin.get();
                    try (TraceProxy trace = TraceProxy.fromRpc(rpcTrace, facade2);
                         ExtractionContextProxy context = ExtractionContextProxy.fromRpc(rpcDataContext, trace.traceId(), facade2);){
                        id = (String)trace.get("id");
                        this.logStartProcess(context, id);
                        this.process(plugin, rpcTrace, rpcDataContext, facade2);
                    }
                    long duration = System.nanoTime() - start;
                    facade2.finishProcessing((double)duration / 1.0E9);
                    LOG.info("Finished processed trace with id: {}", (Object)id);
                }
                catch (Throwable t) {
                    LOG.error("Error during processing trace with id: {}", id, (Object)t);
                    long duration = System.nanoTime() - start;
                    facade2.processPartialResultOrError(t, (double)duration / 1.0E9);
                }
            }

            private void logStartProcess(DataContext context, String id) {
                String dataType = context.dataType();
                if (Checks.isMetaContext((DataContext)context)) {
                    LOG.info("Started processing trace with id: {}, data type: {}", (Object)id, (Object)dataType);
                } else {
                    long size = context.data().size();
                    LOG.info("Started processing trace with id: {}, data type: {}, size: {}", new Object[]{id, dataType, size});
                }
            }

            private void process(BaseExtractionPlugin plugin, RpcTrace rpcTrace, RpcDataContext rpcDataContext, GrpcFacade facade2) throws Exception {
                block14: {
                    try (TraceProxy trace = TraceProxy.fromRpc(rpcTrace, facade2);
                         ExtractionContextProxy context = ExtractionContextProxy.fromRpc(rpcDataContext, trace.traceId(), facade2);){
                        if (plugin instanceof ExtractionPlugin) {
                            ((ExtractionPlugin)plugin).process((Trace)trace, (DataContext)context);
                            break block14;
                        }
                        if (plugin instanceof DeferredExtractionPlugin) {
                            TraceSearcherProxy searcher = new TraceSearcherProxy(facade2);
                            ((DeferredExtractionPlugin)plugin).process((Trace)trace, (DataContext)context, (TraceSearcher)searcher);
                            break block14;
                        }
                        throw new IllegalArgumentException("Provided plugin is not a known implementation of ExtractionPlugin or DeferredExtractionPlugin");
                    }
                }
            }

            @Override
            public void onError(Throwable t) {
                LOG.error("Error received from stream: {}", (Object)t.getMessage(), (Object)t);
                if (this._myThreadReference.get() != null) {
                    this._myThreadReference.get().interrupt();
                }
            }

            @Override
            public void onCompleted() {
                facade.onCompleted();
            }
        };
    }
}

