package io.datakernel.stream.processor;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufQueue;
import io.datakernel.exception.ParseException;
import io.datakernel.exception.TruncatedDataException;
import io.datakernel.serializer.BufferSerializer;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamStatus;

/* loaded from: input_file:io/datakernel/stream/processor/StreamBinaryDeserializer.class */
public final class StreamBinaryDeserializer<T> implements StreamTransformer<ByteBuf, T> {
    public static final ParseException HEADER_SIZE_EXCEPTION;
    public static final ParseException DESERIALIZED_SIZE_EXCEPTION;
    private final BufferSerializer<T> valueSerializer;
    private StreamBinaryDeserializer<T>.Input input = new Input();
    private StreamBinaryDeserializer<T>.Output output;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datakernel/stream/processor/StreamBinaryDeserializer$Input.class */
    public final class Input extends AbstractStreamConsumer<ByteBuf> {
        private Input() {
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onEndOfStream() {
            StreamBinaryDeserializer.this.output.produce();
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            StreamBinaryDeserializer.this.output.closeWithError(th);
        }
    }

    /* loaded from: input_file:io/datakernel/stream/processor/StreamBinaryDeserializer$Output.class */
    private final class Output extends AbstractStreamProducer<T> implements StreamDataReceiver<ByteBuf> {
        private final ByteBufQueue queue;
        private final BufferSerializer<T> valueSerializer;

        private Output(BufferSerializer<T> bufferSerializer) {
            this.queue = ByteBufQueue.create();
            this.valueSerializer = bufferSerializer;
        }

        @Override // io.datakernel.stream.StreamDataReceiver
        public void onData(ByteBuf byteBuf) {
            this.queue.add(byteBuf);
            produce();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.datakernel.stream.AbstractStreamProducer
        public void onWired() {
            super.onWired();
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onSuspended() {
            StreamBinaryDeserializer.this.input.getProducer().suspend();
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void produce() {
            while (isReceiverReady() && this.queue.hasRemaining()) {
                try {
                    int tryPeekSize = StreamBinaryDeserializer.tryPeekSize(this.queue);
                    int i = tryPeekSize >>> 24;
                    int i2 = i + (tryPeekSize & 16777215);
                    if (i != 0 && this.queue.hasRemainingBytes(i2)) {
                        ByteBuf takeExactSize = this.queue.takeExactSize(i2);
                        takeExactSize.moveReadPosition(i);
                        try {
                            Object deserialize = this.valueSerializer.deserialize(takeExactSize);
                            if (takeExactSize.canRead()) {
                                throw StreamBinaryDeserializer.DESERIALIZED_SIZE_EXCEPTION;
                            }
                            takeExactSize.recycle();
                            send(deserialize);
                        } catch (Exception e) {
                            throw new ParseException("Deserialization error", e);
                        }
                    }
                } catch (ParseException e2) {
                    closeWithError(e2);
                    return;
                }
            }
            if (isReceiverReady()) {
                StreamBinaryDeserializer.this.input.getProducer().produce(this);
                if (StreamBinaryDeserializer.this.input.getStatus() == StreamStatus.END_OF_STREAM) {
                    if (!this.queue.isEmpty()) {
                        throw new TruncatedDataException(String.format("Truncated serialized data stream, %s : %s", this, this.queue));
                    }
                    StreamBinaryDeserializer.this.output.sendEndOfStream();
                }
            }
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onError(Throwable th) {
            StreamBinaryDeserializer.this.input.closeWithError(th);
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void cleanup() {
            this.queue.clear();
        }
    }

    private StreamBinaryDeserializer(BufferSerializer<T> bufferSerializer) {
        this.valueSerializer = bufferSerializer;
        this.output = new Output(bufferSerializer);
    }

    public static <T> StreamBinaryDeserializer<T> create(BufferSerializer<T> bufferSerializer) {
        return new StreamBinaryDeserializer<>(bufferSerializer);
    }

    @Override // io.datakernel.stream.HasInput
    public StreamConsumer<ByteBuf> getInput() {
        return this.input;
    }

    @Override // io.datakernel.stream.HasOutput
    public StreamProducer<T> getOutput() {
        return this.output;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public static int tryPeekSize(ByteBufQueue byteBufQueue) throws ParseException {
        if (!$assertionsDisabled && !byteBufQueue.hasRemaining()) {
            throw new AssertionError();
        }
        byte b = 0;
        int i = 0;
        byte peekByte = byteBufQueue.peekByte();
        if (peekByte >= 0) {
            b = peekByte;
            i = 1;
        } else if (byteBufQueue.hasRemainingBytes(2)) {
            b = peekByte & Byte.MAX_VALUE ? 1 : 0;
            byte peekByte2 = byteBufQueue.peekByte(1);
            if (peekByte2 >= 0) {
                b = (b | (peekByte2 << 7)) == true ? 1 : 0;
                i = 2;
            } else if (byteBufQueue.hasRemainingBytes(3)) {
                int i2 = b | ((peekByte2 & Byte.MAX_VALUE) << 7);
                byte peekByte3 = byteBufQueue.peekByte(2);
                if (peekByte3 < 0) {
                    throw HEADER_SIZE_EXCEPTION;
                }
                b = (i2 | (peekByte3 << 14)) == true ? 1 : 0;
                i = 3;
            }
        }
        return (i << 24) + b;
    }

    static {
        $assertionsDisabled = !StreamBinaryDeserializer.class.desiredAssertionStatus();
        HEADER_SIZE_EXCEPTION = new ParseException("Header size is too large");
        DESERIALIZED_SIZE_EXCEPTION = new ParseException("Deserialized size != parsed data size");
    }
}
