/*
 * Decompiled with CFR 0.152.
 */
package org.hansken.plugin.extraction.runtime.grpc.server.proxy;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.hansken.extraction.plugin.grpc.RpcBatchUpdate;
import org.hansken.extraction.plugin.grpc.RpcBeginChild;
import org.hansken.extraction.plugin.grpc.RpcBeginDataStream;
import org.hansken.extraction.plugin.grpc.RpcFinish;
import org.hansken.extraction.plugin.grpc.RpcFinishChild;
import org.hansken.extraction.plugin.grpc.RpcFinishDataStream;
import org.hansken.extraction.plugin.grpc.RpcPartialFinishWithError;
import org.hansken.extraction.plugin.grpc.RpcProfile;
import org.hansken.extraction.plugin.grpc.RpcSearchResult;
import org.hansken.extraction.plugin.grpc.RpcWriteDataStream;
import org.hansken.plugin.extraction.api.BatchSearchResult;
import org.hansken.plugin.extraction.api.SearchResult;
import org.hansken.plugin.extraction.api.SearchTrace;
import org.hansken.plugin.extraction.api.Trace;
import org.hansken.plugin.extraction.api.TraceSearcher;
import org.hansken.plugin.extraction.api.transformations.DataTransformation;
import org.hansken.plugin.extraction.runtime.grpc.common.Pack;
import org.hansken.plugin.extraction.runtime.grpc.common.Unpack;
import org.hansken.plugin.extraction.runtime.grpc.server.proxy.SearchTraceProxy;
import org.hansken.plugin.extraction.util.ArgChecks;

public class GrpcFacade {
    private static final int MESSAGE_OVERHEAD_SIZE = 1024;
    private static final int PER_ACTION_OVERHEAD_SIZE = 8;
    private static final int MINIMUM_MESSAGE_SIZE = 0x100000;
    private static final int SEARCH_REQUEST_COUNT_LIMIT = 50;
    private final int _bufferedMessageSize;
    private final int _maximumReadChunkSize;
    private final int _maximumWriteChunkSize;
    private final BlockingQueue<Any> _incomingMessages;
    private final StreamObserver<Any> _outgoingMessages;
    private final AtomicBoolean _expectingResponse;
    private final List<Any> _bufferedUpdates;
    private long _currentlyBufferedSize;
    private long _start;

    public GrpcFacade(BlockingQueue<Any> incomingMessages, StreamObserver<Any> outgoingMessages, int maximumMessageSize) {
        this._incomingMessages = (BlockingQueue)ArgChecks.argNotNull((String)"incomingMessages", incomingMessages);
        this._outgoingMessages = (StreamObserver)ArgChecks.argNotNull((String)"outgoingMessages", outgoingMessages);
        if (maximumMessageSize < 0x100000) {
            throw new IllegalArgumentException("maximum message size is too small: " + maximumMessageSize + ", should be at least1048576");
        }
        this._bufferedMessageSize = maximumMessageSize;
        this._maximumReadChunkSize = maximumMessageSize - 1024;
        this._maximumWriteChunkSize = this._maximumReadChunkSize - 8;
        this._expectingResponse = new AtomicBoolean(false);
        this._bufferedUpdates = new ArrayList<Any>(64);
    }

    public byte[] readFromTraceData(long position, int count, String traceUid, String type) {
        int toRead;
        byte[] buffer = new byte[count];
        for (int remaining = count; remaining > 0; remaining -= toRead) {
            long currentPosition = position + (long)(count - remaining);
            toRead = Integer.min(remaining, this._maximumReadChunkSize);
            byte[] read = Unpack.bytes((Any)this.call((Message)Pack.readParameters((long)currentPosition, (int)toRead, (String)traceUid, (String)type)));
            System.arraycopy(read, 0, buffer, count - remaining, read.length);
        }
        return buffer;
    }

    public void enrichTrace(String id, Set<String> types, Map<String, Object> properties, List<Trace.Tracelet> tracelets, Map<String, List<DataTransformation>> transformations) {
        this.buffer(Any.pack((Message)Pack.traceEnrichment((String)id, types, properties, tracelets, transformations)));
    }

    public void beginChild(String id, String name) {
        this.buffer(Any.pack((Message)RpcBeginChild.newBuilder().setId(id).setName(name).build()));
    }

    public void finishChild(String id) {
        this.buffer(Any.pack((Message)RpcFinishChild.newBuilder().setId(id).build()));
    }

    public void beginWritingData(String id, String dataType) {
        this.buffer(Any.pack((Message)RpcBeginDataStream.newBuilder().setTraceId(id).setDataType(dataType).build()));
    }

    public void writeData(String id, String dataType, byte[] data, int offset, int length) {
        int toWrite;
        for (int remaining = length; remaining > 0; remaining -= toWrite) {
            long currentPosition = (long)offset + (long)length - (long)remaining;
            int from = Math.toIntExact(currentPosition);
            toWrite = Integer.min(remaining, this._maximumWriteChunkSize);
            this.buffer(Any.pack((Message)this.writeDataMessage(id, dataType, data, from, toWrite)));
        }
    }

    private RpcWriteDataStream writeDataMessage(String id, String dataType, byte[] buffer, int offset, int length) {
        return RpcWriteDataStream.newBuilder().setTraceId(id).setDataType(dataType).setData(ByteString.copyFrom((byte[])buffer, (int)offset, (int)length)).build();
    }

    public void finishWritingData(String id, String dataType) {
        this.buffer(Any.pack((Message)RpcFinishDataStream.newBuilder().setTraceId(id).setDataType(dataType).build()));
    }

    public SearchResult searchTraces(String query, int count, TraceSearcher.SearchScope scope) {
        if (count > 50) {
            throw new IllegalArgumentException("search request count must not exceed the limit of 50");
        }
        Any anyResult = this.call((Message)Pack.searchRequest((String)query, (int)count, (TraceSearcher.SearchScope)scope));
        RpcSearchResult rpcResult = (RpcSearchResult)Unpack.any((Any)anyResult, RpcSearchResult.class);
        BatchSearchResult result = new BatchSearchResult(rpcResult.getTotalResults());
        result.setTraces((SearchTrace[])rpcResult.getTracesList().stream().map(trace -> SearchTraceProxy.fromRpc(trace, this)).toArray(SearchTraceProxy[]::new));
        return result;
    }

    public void handleResponse(Any any) {
        if (!this._expectingResponse.get()) {
            throw new IllegalStateException("unexpected message received: " + any);
        }
        if (!this._incomingMessages.offer(any)) {
            throw new IllegalStateException("could not handle incoming message: " + any);
        }
    }

    public void onError(Status.Code statusCode, Throwable t) {
        this._outgoingMessages.onError((Throwable)Pack.asStatusRuntimeException((Status.Code)statusCode, (Throwable)t));
    }

    public void onCompleted() {
        this._outgoingMessages.onCompleted();
    }

    public void finishProcessing(double duration) {
        RpcFinish update = RpcFinish.newBuilder().setUpdate(RpcBatchUpdate.newBuilder().addAllActions(this._bufferedUpdates).build()).setProfile(this.profile(duration)).build();
        this._outgoingMessages.onNext((Object)Any.pack((Message)update));
    }

    public void processPartialResultOrError(Throwable t, double duration) {
        Status.Code statusCode = this._currentlyBufferedSize == 0L ? Status.Code.CANCELLED : Status.Code.DATA_LOSS;
        this._outgoingMessages.onNext((Object)Any.pack((Message)RpcPartialFinishWithError.newBuilder().addAllActions(this._bufferedUpdates).setStatusCode(statusCode.name()).setErrorDescription(ExceptionUtils.getStackTrace((Throwable)t)).setProfile(this.profile(duration)).build()));
    }

    private RpcProfile profile(double duration) {
        return RpcProfile.newBuilder().putProfileDoubles("duration", duration).build();
    }

    private void buffer(Any action) {
        int actionSize = action.getSerializedSize();
        long estimatedUpdateMessageSize = this._currentlyBufferedSize + (long)actionSize + 8L + 1024L;
        if (estimatedUpdateMessageSize > (long)this._bufferedMessageSize) {
            RpcBatchUpdate update = RpcBatchUpdate.newBuilder().addAllActions(this._bufferedUpdates).build();
            this._bufferedUpdates.clear();
            this._currentlyBufferedSize = 0L;
            this.call(Any.pack((Message)update));
        }
        this._currentlyBufferedSize += (long)(actionSize + 8);
        this._bufferedUpdates.add(action);
    }

    private Any call(Message message) {
        return this.call(Any.pack((Message)message));
    }

    private Any call(Any message) {
        try {
            if (this._expectingResponse.getAndSet(true)) {
                throw new IllegalStateException("expecting response for previous request, can not send one in parallel");
            }
            this._outgoingMessages.onNext((Object)message);
            Any any = this._incomingMessages.take();
            return any;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
        finally {
            this._expectingResponse.set(false);
        }
    }
}

