/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network;

import io.netty.channel.Channel;
import java.util.ArrayList;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.network.ExtendedChannelPromise;
import org.apache.spark.network.TestManagedBuffer;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.RequestMessage;
import org.apache.spark.network.protocol.StreamFailure;
import org.apache.spark.network.protocol.StreamRequest;
import org.apache.spark.network.protocol.StreamResponse;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportRequestHandler;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TransportRequestHandlerSuite {
    @Test
    public void handleStreamRequest() throws Exception {
        NoOpRpcHandler rpcHandler = new NoOpRpcHandler();
        OneForOneStreamManager streamManager = (OneForOneStreamManager)rpcHandler.getStreamManager();
        Channel channel = (Channel)Mockito.mock(Channel.class);
        ArrayList responseAndPromisePairs = new ArrayList();
        Mockito.when((Object)channel.writeAndFlush(Mockito.any())).thenAnswer(invocationOnMock0 -> {
            Object response = invocationOnMock0.getArguments()[0];
            ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
            responseAndPromisePairs.add(ImmutablePair.of((Object)response, (Object)((Object)channelFuture)));
            return channelFuture;
        });
        ArrayList<TestManagedBuffer> managedBuffers = new ArrayList<TestManagedBuffer>();
        managedBuffers.add(new TestManagedBuffer(10));
        managedBuffers.add(new TestManagedBuffer(20));
        managedBuffers.add(null);
        managedBuffers.add(new TestManagedBuffer(30));
        managedBuffers.add(new TestManagedBuffer(40));
        long streamId = streamManager.registerStream("test-app", managedBuffers.iterator(), channel);
        Assert.assertEquals((long)1L, (long)streamManager.numStreamStates());
        TransportClient reverseClient = (TransportClient)Mockito.mock(TransportClient.class);
        TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient, (RpcHandler)rpcHandler, Long.valueOf(2L), null);
        StreamRequest request0 = new StreamRequest(String.format("%d_%d", streamId, 0));
        requestHandler.handle((RequestMessage)request0);
        Assert.assertEquals((long)1L, (long)responseAndPromisePairs.size());
        Assert.assertTrue((boolean)(((Pair)responseAndPromisePairs.get(0)).getLeft() instanceof StreamResponse));
        Assert.assertEquals(managedBuffers.get(0), (Object)((StreamResponse)((Pair)responseAndPromisePairs.get(0)).getLeft()).body());
        StreamRequest request1 = new StreamRequest(String.format("%d_%d", streamId, 1));
        requestHandler.handle((RequestMessage)request1);
        Assert.assertEquals((long)2L, (long)responseAndPromisePairs.size());
        Assert.assertTrue((boolean)(((Pair)responseAndPromisePairs.get(1)).getLeft() instanceof StreamResponse));
        Assert.assertEquals(managedBuffers.get(1), (Object)((StreamResponse)((Pair)responseAndPromisePairs.get(1)).getLeft()).body());
        ((ExtendedChannelPromise)((Object)((Pair)responseAndPromisePairs.get(0)).getRight())).finish(true);
        StreamRequest request2 = new StreamRequest(String.format("%d_%d", streamId, 2));
        requestHandler.handle((RequestMessage)request2);
        Assert.assertEquals((long)3L, (long)responseAndPromisePairs.size());
        Assert.assertTrue((boolean)(((Pair)responseAndPromisePairs.get(2)).getLeft() instanceof StreamFailure));
        Assert.assertEquals((Object)String.format("Stream '%s' was not found.", request2.streamId), (Object)((StreamFailure)((Pair)responseAndPromisePairs.get((int)2)).getLeft()).error);
        StreamRequest request3 = new StreamRequest(String.format("%d_%d", streamId, 3));
        requestHandler.handle((RequestMessage)request3);
        Assert.assertEquals((long)4L, (long)responseAndPromisePairs.size());
        Assert.assertTrue((boolean)(((Pair)responseAndPromisePairs.get(3)).getLeft() instanceof StreamResponse));
        Assert.assertEquals(managedBuffers.get(3), (Object)((StreamResponse)((Pair)responseAndPromisePairs.get(3)).getLeft()).body());
        StreamRequest request4 = new StreamRequest(String.format("%d_%d", streamId, 4));
        requestHandler.handle((RequestMessage)request4);
        ((Channel)Mockito.verify((Object)channel, (VerificationMode)Mockito.times((int)1))).close();
        Assert.assertEquals((long)4L, (long)responseAndPromisePairs.size());
        streamManager.connectionTerminated(channel);
        Assert.assertEquals((long)0L, (long)streamManager.numStreamStates());
    }
}

