/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.file.dora.netty;

import alluxio.client.file.FileSystemContext;
import alluxio.client.file.dora.netty.DebugLoggingTracer;
import alluxio.client.file.dora.netty.PartialReadException;
import alluxio.client.file.dora.netty.event.ChannelErrorResponseEvent;
import alluxio.client.file.dora.netty.event.ResponseEvent;
import alluxio.client.file.dora.netty.event.ResponseEventContext;
import alluxio.client.file.dora.netty.event.ResponseEventFactory;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.UnavailableException;
import alluxio.file.ReadTargetBuffer;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.shaded.client.com.github.oxo42.stateless4j.StateMachine;
import alluxio.shaded.client.com.github.oxo42.stateless4j.StateMachineConfig;
import alluxio.shaded.client.com.github.oxo42.stateless4j.transitions.Transition;
import alluxio.shaded.client.com.github.oxo42.stateless4j.triggers.TriggerWithParameters1;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.io.grpc.Status;
import alluxio.shaded.client.io.netty.buffer.ByteBuf;
import alluxio.shaded.client.io.netty.channel.Channel;
import alluxio.shaded.client.io.netty.channel.ChannelHandlerContext;
import alluxio.shaded.client.io.netty.channel.ChannelInboundHandlerAdapter;
import alluxio.shaded.client.javax.annotation.Nullable;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.util.CommonUtils;
import alluxio.util.network.NettyUtils;
import alluxio.util.proto.ProtoMessage;
import alluxio.util.proto.ProtoUtils;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class NettyDataReaderStateMachine {
    private static final Logger LOG = LoggerFactory.getLogger(NettyDataReaderStateMachine.class);
    private final StateMachine<State, TriggerEvent> mStateMachine;
    private final TriggerEventsWithParam mTriggerEventsWithParam;
    private final AtomicReference<Runnable> mNextTriggerEvent = new AtomicReference();
    private final FileSystemContext mContext;
    private final long mReadTimeoutMs;
    private final int mMaxPacketsInFlight;
    private final WorkerNetAddress mAddress;
    private final Supplier<Protocol.ReadRequest.Builder> mRequestBuilder;
    private final int mLength;
    private final ReadTargetBuffer mOutputBuffer;
    private final BlockingQueue<ResponseEvent> mResponseEventQueue = new LinkedBlockingQueue<ResponseEvent>();
    @Nullable
    private Channel mChannel;
    private int mBytesRead;
    @Nullable
    private Throwable mLastException;
    @Nullable
    private TriggerEvent mLastExceptionTrigger;

    public NettyDataReaderStateMachine(FileSystemContext context, WorkerNetAddress address, Protocol.ReadRequest.Builder requestBuilder, ReadTargetBuffer buffer) {
        this.mContext = context;
        AlluxioConfiguration conf = context.getClusterConf();
        this.mReadTimeoutMs = conf.getMs(PropertyKey.USER_NETWORK_NETTY_TIMEOUT_MS);
        this.mMaxPacketsInFlight = conf.getInt(PropertyKey.USER_NETWORK_NETTY_READER_BUFFER_SIZE_PACKETS);
        this.mAddress = address;
        this.mRequestBuilder = requestBuilder::clone;
        this.mLength = (int)requestBuilder.getLength();
        this.mOutputBuffer = buffer;
        StateMachineConfig<State, TriggerEvent> config = new StateMachineConfig<State, TriggerEvent>();
        this.mTriggerEventsWithParam = new TriggerEventsWithParam(config);
        config.configure(State.CREATED).permit(TriggerEvent.START, State.ACQUIRING_CHANNEL);
        config.configure(State.ACQUIRING_CHANNEL).onEntry(this::acquireNettyChannel).permit(TriggerEvent.CHANNEL_AVAILABLE, State.CHANNEL_ACTIVE, this::sendRequest).permit(TriggerEvent.CHANNEL_UNAVAILABLE, State.TERMINATED_EXCEPTIONALLY);
        config.configure(State.CHANNEL_ACTIVE).onEntry(this::pollResponseFromQueue).permit(TriggerEvent.DATA_AVAILABLE, State.RECEIVED_DATA).permitReentry(TriggerEvent.HEART_BEAT).permit(TriggerEvent.EOF, State.RECEIVED_EOF).permit(TriggerEvent.TIMEOUT, State.CLIENT_CANCEL, this::sendClientCancel).permit(TriggerEvent.INTERRUPTED, State.CLIENT_CANCEL, this::sendClientCancel).permit(TriggerEvent.SERVER_ERROR, State.CLIENT_CANCEL, this::sendClientCancel).permit(TriggerEvent.CHANNEL_ERROR, State.CLIENT_CANCEL, this::sendClientCancel);
        config.configure(State.RECEIVED_DATA).onEntryFrom(this.mTriggerEventsWithParam.mDataAvailableEvent, this::onReceivedData).permit(TriggerEvent.OUTPUT_ERROR, State.CLIENT_CANCEL, this::sendClientCancel).permit(TriggerEvent.OUTPUT_LENGTH_FULFILLED, State.EXPECTING_EOF).permit(TriggerEvent.OUTPUT_LENGTH_NOT_FULFILLED, State.CHANNEL_ACTIVE);
        config.configure(State.EXPECTING_EOF).onEntry(this::pollResponseFromQueue).permit(TriggerEvent.EOF, State.TERMINATED_NORMALLY).permit(TriggerEvent.TIMEOUT, State.TERMINATED_NORMALLY, this::syncCloseChannel).permit(TriggerEvent.INTERRUPTED, State.TERMINATED_NORMALLY, this::syncCloseChannel).permit(TriggerEvent.SERVER_ERROR, State.TERMINATED_NORMALLY, this::syncCloseChannel).permit(TriggerEvent.CHANNEL_ERROR, State.TERMINATED_NORMALLY, this::syncCloseChannel);
        config.configure(State.RECEIVED_EOF).onEntry(this::onReceivedEof).permit(TriggerEvent.OUTPUT_LENGTH_FULFILLED, State.TERMINATED_NORMALLY).permit(TriggerEvent.OUTPUT_LENGTH_NOT_FULFILLED, State.TERMINATED_NORMALLY);
        config.configure(State.CLIENT_CANCEL).onEntryFrom(this.mTriggerEventsWithParam.mInterruptedEvent, this::setException).onEntryFrom(this.mTriggerEventsWithParam.mTimeoutEvent, this::setException).onEntryFrom(this.mTriggerEventsWithParam.mServerErrorEvent, this::setException).onEntryFrom(this.mTriggerEventsWithParam.mChannelErrorEvent, this::setException).onEntryFrom(this.mTriggerEventsWithParam.mOutputErrorEvent, this::setException).onEntry(this::pollResponseFromQueue).permit(TriggerEvent.DATA_AVAILABLE, State.CLIENT_CANCEL_DATA_RECEIVED).permitReentry(TriggerEvent.EOF).permitReentry(TriggerEvent.HEART_BEAT).permit(TriggerEvent.SERVER_CANCEL, State.TERMINATED_NORMALLY).permit(TriggerEvent.INTERRUPTED, State.TERMINATED_EXCEPTIONALLY).permit(TriggerEvent.TIMEOUT, State.TERMINATED_EXCEPTIONALLY).permit(TriggerEvent.SERVER_ERROR, State.TERMINATED_EXCEPTIONALLY).permit(TriggerEvent.CHANNEL_ERROR, State.TERMINATED_EXCEPTIONALLY);
        config.configure(State.TERMINATED_EXCEPTIONALLY).substateOf(State.TERMINATED).onEntryFrom(this.mTriggerEventsWithParam.mChannelUnavailableEvent, this::setException).onEntryFrom(this.mTriggerEventsWithParam.mInterruptedEvent, this::addExceptionAsSuppressed).onEntryFrom(this.mTriggerEventsWithParam.mTimeoutEvent, this::addExceptionAsSuppressed).onEntryFrom(this.mTriggerEventsWithParam.mServerErrorEvent, this::addExceptionAsSuppressed).onEntryFrom(this.mTriggerEventsWithParam.mChannelErrorEvent, this::addExceptionAsSuppressed).onEntry(this::onTerminatedExceptionally);
        config.configure(State.CLIENT_CANCEL_DATA_RECEIVED).onEntryFrom(this.mTriggerEventsWithParam.mDataAvailableEvent, this::onClientCancelDataReceived).permit(TriggerEvent.DATA_DISCARDED, State.CLIENT_CANCEL);
        config.configure(State.TERMINATED_NORMALLY).substateOf(State.TERMINATED).onEntry(this::onTerminatedNormally);
        this.mStateMachine = new StateMachine<State, TriggerEvent>(State.CREATED, config);
        this.mStateMachine.setTrace(new DebugLoggingTracer(LOG));
        this.mStateMachine.fireInitialTransition();
    }

    public void fireNext(TriggerEvent triggerEvent) {
        this.mNextTriggerEvent.set(() -> this.mStateMachine.fire(triggerEvent));
    }

    public <Arg0T> void fireNext(TriggerWithParameters1<Arg0T, TriggerEvent> triggerEvent, Arg0T arg0) {
        this.mNextTriggerEvent.set(() -> this.mStateMachine.fire(triggerEvent, arg0));
    }

    public TriggerEventsWithParam getTriggerEventsWithParam() {
        return this.mTriggerEventsWithParam;
    }

    public void run() {
        Preconditions.checkState(this.mStateMachine.isInState(State.CREATED), "state machine cannot be restarted: expected initial state %s, encountered %s", (Object)State.CREATED, (Object)this.mStateMachine.getState());
        this.fireNext(TriggerEvent.START);
        try {
            Runnable trigger = this.mNextTriggerEvent.getAndSet(null);
            while (trigger != null) {
                trigger.run();
                trigger = this.mNextTriggerEvent.getAndSet(null);
            }
        }
        catch (RuntimeException e) {
            LOG.error("Unexpected exception during execution, state: {}", (Object)this.mStateMachine.getState(), (Object)e);
            if (this.mChannel != null) {
                CommonUtils.closeChannel(this.mChannel);
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
            }
            throw e;
        }
        Preconditions.checkState(this.mStateMachine.isInState(State.TERMINATED), "execution of state machine has stopped but it is not in a terminated state");
    }

    public int getBytesRead() {
        return this.mBytesRead;
    }

    @VisibleForTesting
    public void generateStateDiagram(Path outputFile) throws IOException {
        try (OutputStream outFile = Files.newOutputStream(outputFile, StandardOpenOption.CREATE, StandardOpenOption.WRITE);){
            this.mStateMachine.configuration().generateDotFileInto(outFile, true);
        }
    }

    @Nullable
    public PartialReadException getException() {
        if (this.mLastExceptionTrigger == null) {
            return null;
        }
        switch (this.mLastExceptionTrigger) {
            case INTERRUPTED: {
                return new PartialReadException(this.mLength, this.mBytesRead, PartialReadException.CauseType.INTERRUPT, this.mLastException);
            }
            case OUTPUT_ERROR: {
                return new PartialReadException(this.mLength, this.mBytesRead, PartialReadException.CauseType.OUTPUT, this.mLastException);
            }
            case TIMEOUT: {
                return new PartialReadException(this.mLength, this.mBytesRead, PartialReadException.CauseType.TIMEOUT, this.mLastException);
            }
            case SERVER_ERROR: {
                return new PartialReadException(this.mLength, this.mBytesRead, PartialReadException.CauseType.SERVER_ERROR, this.mLastException);
            }
            case CHANNEL_ERROR: 
            case CHANNEL_UNAVAILABLE: {
                return new PartialReadException(this.mLength, this.mBytesRead, PartialReadException.CauseType.TRANSPORT_ERROR, this.mLastException);
            }
        }
        throw new IllegalStateException("unexpected trigger type: " + this.mLastExceptionTrigger);
    }

    void acquireNettyChannel() {
        try {
            this.mChannel = this.mContext.acquireNettyChannel(this.mAddress);
            this.mChannel.pipeline().addLast(new PacketReadHandler(this.mResponseEventQueue, this.mMaxPacketsInFlight));
        }
        catch (IOException ioe) {
            this.fireNext(this.mTriggerEventsWithParam.mChannelUnavailableEvent, ioe);
            return;
        }
        this.fireNext(TriggerEvent.CHANNEL_AVAILABLE);
    }

    void sendRequest() {
        Preconditions.checkNotNull(this.mChannel, "channel has not been acquired");
        Protocol.ReadRequest readRequest = this.mRequestBuilder.get().clearCancel().build();
        this.mChannel.writeAndFlush(new RPCProtoMessage(new ProtoMessage(readRequest))).addListener(future -> {
            if (!future.isSuccess()) {
                this.mResponseEventQueue.offer(ResponseEventFactory.getResponseEventFactory().createChannelErrorResponseEvent(future.cause()));
            }
        });
    }

    void pollResponseFromQueue() {
        ResponseEvent responseEvent;
        if (!NettyDataReaderStateMachine.tooManyResponseEventsPending(this.mResponseEventQueue, this.mMaxPacketsInFlight)) {
            NettyUtils.enableAutoRead(this.mChannel);
        }
        ResponseEventContext responseEventContext = new ResponseEventContext(this);
        try {
            responseEvent = this.mResponseEventQueue.poll(this.mReadTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException interruptedException) {
            Thread.currentThread().interrupt();
            this.fireNext(this.mTriggerEventsWithParam.mInterruptedEvent, interruptedException);
            return;
        }
        if (responseEvent == null) {
            this.fireNext(this.mTriggerEventsWithParam.mTimeoutEvent, new TimeoutException("Timed out when waiting for server response for " + this.mReadTimeoutMs + " ms"));
        } else {
            responseEvent.postProcess(responseEventContext);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onReceivedData(ByteBuf buf, Transition<State, TriggerEvent> transition) {
        Preconditions.checkState(TriggerEvent.DATA_AVAILABLE == transition.getTrigger());
        int bytesToWrite = buf.readableBytes();
        try {
            this.mOutputBuffer.writeBytes(buf);
        }
        catch (RuntimeException e) {
            this.fireNext(this.mTriggerEventsWithParam.mOutputErrorEvent, e);
            return;
        }
        finally {
            buf.release();
        }
        this.mBytesRead += bytesToWrite;
        if (this.mBytesRead < this.mLength) {
            this.fireNext(TriggerEvent.OUTPUT_LENGTH_NOT_FULFILLED);
        } else {
            this.fireNext(TriggerEvent.OUTPUT_LENGTH_FULFILLED);
        }
    }

    void onReceivedEof(Transition<State, TriggerEvent> transition) {
        if (this.mBytesRead < this.mLength) {
            this.fireNext(TriggerEvent.OUTPUT_LENGTH_NOT_FULFILLED);
        } else {
            this.fireNext(TriggerEvent.OUTPUT_LENGTH_FULFILLED);
        }
    }

    <T extends Throwable> void setException(T e, Transition<State, TriggerEvent> transition) {
        this.mLastException = e;
        this.mLastExceptionTrigger = transition.getTrigger();
    }

    <T extends Throwable> void addExceptionAsSuppressed(T suppressed, Transition<State, TriggerEvent> transition) {
        if (this.mLastException != null) {
            this.mLastException.addSuppressed(suppressed);
        }
    }

    void sendClientCancel() {
        Preconditions.checkNotNull(this.mChannel, "cannot cancel when channel has not been acquired");
        Protocol.ReadRequest cancelRequest = this.mRequestBuilder.get().setCancel(true).build();
        this.mChannel.writeAndFlush(new RPCProtoMessage(new ProtoMessage(cancelRequest))).addListener(future -> {
            if (!future.isSuccess()) {
                this.mResponseEventQueue.offer(ResponseEventFactory.getResponseEventFactory().createChannelErrorResponseEvent(future.cause()));
            }
        });
    }

    void syncCloseChannel() {
        Preconditions.checkNotNull(this.mChannel, "cannot close channel when channel has not been acquired");
        CommonUtils.closeChannel(this.mChannel);
    }

    void onClientCancelDataReceived(ByteBuf byteBuf, Transition<State, TriggerEvent> transition) {
        Preconditions.checkState(transition.getTrigger() == TriggerEvent.DATA_AVAILABLE);
        Preconditions.checkState(transition.getSource() == State.CLIENT_CANCEL);
        byteBuf.release();
        this.fireNext(TriggerEvent.DATA_DISCARDED);
    }

    void onTerminatedExceptionally(Transition<State, TriggerEvent> transition) {
        if (this.mChannel != null) {
            if (this.mChannel.isOpen()) {
                CommonUtils.closeChannel(this.mChannel);
            }
            this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
        }
    }

    void onTerminatedNormally(Transition<State, TriggerEvent> transition) {
        Preconditions.checkNotNull(this.mChannel, "terminated normally but channel is null");
        if (this.mChannel.isOpen()) {
            this.mChannel.pipeline().removeLast();
            NettyUtils.enableAutoRead(this.mChannel);
        }
        this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
    }

    private static boolean tooManyResponseEventsPending(BlockingQueue<ResponseEvent> queue, int maxPacketsInFlight) {
        return queue.size() >= maxPacketsInFlight;
    }

    private static class PacketReadHandler
    extends ChannelInboundHandlerAdapter {
        private final BlockingQueue<ResponseEvent> mResponseEventQueue;
        private final int mMaxPacketsInFlight;
        private final ResponseEventFactory mResponseEventFactory = ResponseEventFactory.getResponseEventFactory();

        PacketReadHandler(BlockingQueue<ResponseEvent> responseEventQueue, int maxPacketsInFlight) {
            this.mResponseEventQueue = responseEventQueue;
            this.mMaxPacketsInFlight = maxPacketsInFlight;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
            ResponseEvent responseEvent;
            if (!(msg instanceof RPCProtoMessage)) {
                throw new IllegalStateException(String.format("Incorrect response type %s, %s.", msg.getClass().getCanonicalName(), msg));
            }
            RPCProtoMessage rpcProtoMessage = (RPCProtoMessage)msg;
            ProtoMessage message = rpcProtoMessage.getMessage();
            if (message.isReadResponse()) {
                Preconditions.checkState(message.asReadResponse().getType() == Protocol.ReadResponse.Type.UFS_READ_HEARTBEAT);
                responseEvent = this.mResponseEventFactory.createUfsReadHeartBeatResponseEvent();
            } else if (message.isResponse()) {
                Protocol.Response response = message.asResponse();
                switch (response.getStatus()) {
                    case CANCELLED: {
                        responseEvent = this.mResponseEventFactory.createCancelResponseEvent();
                        break;
                    }
                    case OK: {
                        DataBuffer dataBuffer = rpcProtoMessage.getPayloadDataBuffer();
                        if (dataBuffer != null) {
                            Preconditions.checkState(dataBuffer.getNettyOutput() instanceof ByteBuf, "dataBuffer.getNettyOutput is not of type ByteBuf");
                            responseEvent = this.mResponseEventFactory.createDataResponseEvent(dataBuffer);
                            break;
                        }
                        responseEvent = this.mResponseEventFactory.createEofResponseEvent();
                        break;
                    }
                    default: {
                        Status status = ProtoUtils.fromProto(response.getStatus());
                        AlluxioStatusException error = AlluxioStatusException.from(status.withDescription(String.format("Error from server %s: %s", ctx.channel().remoteAddress(), response.getMessage())));
                        responseEvent = this.mResponseEventFactory.createServerErrorResponseEvent(error);
                        break;
                    }
                }
            } else {
                throw new IllegalStateException(String.format("Incorrect response type %s.", message));
            }
            if (NettyDataReaderStateMachine.tooManyResponseEventsPending(this.mResponseEventQueue, this.mMaxPacketsInFlight)) {
                NettyUtils.disableAutoRead(ctx.channel());
            }
            this.mResponseEventQueue.offer(responseEvent);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            LOG.error("Exception is caught while reading data from channel {}:", (Object)ctx.channel(), (Object)cause);
            ChannelErrorResponseEvent responseEvent = this.mResponseEventFactory.createChannelErrorResponseEvent(cause);
            this.mResponseEventQueue.offer(responseEvent);
            ctx.fireExceptionCaught(cause);
        }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) {
            LOG.warn("Channel is closed while reading data from channel {}.", (Object)ctx.channel());
            ChannelErrorResponseEvent responseEvent = this.mResponseEventFactory.createChannelErrorResponseEvent(new UnavailableException(String.format("Channel %s is closed.", ctx.channel())));
            this.mResponseEventQueue.offer(responseEvent);
            ctx.fireChannelUnregistered();
        }
    }

    public static class TriggerEventsWithParam {
        public final TriggerWithParameters1<IOException, TriggerEvent> mChannelUnavailableEvent;
        public final TriggerWithParameters1<ByteBuf, TriggerEvent> mDataAvailableEvent;
        public final TriggerWithParameters1<AlluxioStatusException, TriggerEvent> mServerErrorEvent;
        public final TriggerWithParameters1<Throwable, TriggerEvent> mChannelErrorEvent;
        public final TriggerWithParameters1<Throwable, TriggerEvent> mOutputErrorEvent;
        public final TriggerWithParameters1<TimeoutException, TriggerEvent> mTimeoutEvent;
        public final TriggerWithParameters1<InterruptedException, TriggerEvent> mInterruptedEvent;

        public TriggerEventsWithParam(StateMachineConfig<State, TriggerEvent> config) {
            this.mChannelUnavailableEvent = config.setTriggerParameters(TriggerEvent.CHANNEL_UNAVAILABLE, IOException.class);
            this.mDataAvailableEvent = config.setTriggerParameters(TriggerEvent.DATA_AVAILABLE, ByteBuf.class);
            this.mServerErrorEvent = config.setTriggerParameters(TriggerEvent.SERVER_ERROR, AlluxioStatusException.class);
            this.mChannelErrorEvent = config.setTriggerParameters(TriggerEvent.CHANNEL_ERROR, Throwable.class);
            this.mOutputErrorEvent = config.setTriggerParameters(TriggerEvent.OUTPUT_ERROR, Throwable.class);
            this.mTimeoutEvent = config.setTriggerParameters(TriggerEvent.TIMEOUT, TimeoutException.class);
            this.mInterruptedEvent = config.setTriggerParameters(TriggerEvent.INTERRUPTED, InterruptedException.class);
        }
    }

    public static enum TriggerEvent {
        START,
        CHANNEL_AVAILABLE,
        CHANNEL_UNAVAILABLE,
        DATA_AVAILABLE,
        HEART_BEAT,
        TIMEOUT,
        INTERRUPTED,
        EOF,
        OUTPUT_LENGTH_FULFILLED,
        OUTPUT_LENGTH_NOT_FULFILLED,
        OUTPUT_ERROR,
        SERVER_ERROR,
        SERVER_CANCEL,
        CHANNEL_ERROR,
        UNKNOWN_PAYLOAD,
        DATA_DISCARDED;

    }

    static enum State {
        CREATED,
        TERMINATED,
        TERMINATED_NORMALLY,
        TERMINATED_EXCEPTIONALLY,
        CLIENT_CANCEL,
        CLIENT_CANCEL_DATA_RECEIVED,
        ACQUIRING_CHANNEL,
        CHANNEL_ACTIVE,
        RECEIVED_DATA,
        EXPECTING_EOF,
        RECEIVED_EOF;

    }
}

