/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiClock;
import com.google.api.core.ApiService;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
import com.google.cloud.pubsub.v1.FakeScheduledExecutorService;
import com.google.cloud.pubsub.v1.FakeSubscriberServiceImpl;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.BindableService;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

public class SubscriberTest {
    private static final ProjectSubscriptionName TEST_SUBSCRIPTION = ProjectSubscriptionName.of((String)"test-project", (String)"test-subscription");
    private ManagedChannel testChannel;
    private FakeScheduledExecutorService fakeExecutor;
    private FakeSubscriberServiceImpl fakeSubscriberServiceImpl;
    private Server testServer;
    private LinkedBlockingQueue<AckReplyConsumerWithResponse> consumersWithResponse;
    private MessageReceiverWithAckResponse messageReceiverWithAckResponse;
    private final MessageReceiver testReceiver = new MessageReceiver(){

        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            consumer.ack();
        }
    };
    @Rule
    public TestName testName = new TestName();

    @Before
    public void setUp() throws Exception {
        this.consumersWithResponse = new LinkedBlockingQueue();
        InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName((String)this.testName.getMethodName());
        this.fakeSubscriberServiceImpl = new FakeSubscriberServiceImpl();
        this.fakeExecutor = new FakeScheduledExecutorService();
        this.testChannel = InProcessChannelBuilder.forName((String)this.testName.getMethodName()).build();
        serverBuilder.addService((BindableService)this.fakeSubscriberServiceImpl);
        this.testServer = serverBuilder.build();
        this.testServer.start();
        this.messageReceiverWithAckResponse = new MessageReceiverWithAckResponse(){

            public void receiveMessage(PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) {
                SubscriberTest.this.consumersWithResponse.add(consumerWithResponse);
            }
        };
    }

    @After
    public void tearDown() throws Exception {
        this.testServer.shutdownNow().awaitTermination();
        this.testChannel.shutdown();
    }

    @Test
    public void testDeliveryAttemptHelper() {
        Integer deliveryAttempt = 3;
        PubsubMessage message = PubsubMessage.newBuilder().putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)).build();
        Assert.assertEquals((Object)Subscriber.getDeliveryAttempt((PubsubMessage)message), (Object)deliveryAttempt);
        PubsubMessage emptyMessage = PubsubMessage.newBuilder().build();
        Assert.assertEquals((Object)Subscriber.getDeliveryAttempt((PubsubMessage)emptyMessage), null);
    }

    @Test
    public void testOpenedChannels() throws Exception {
        int expectedChannelCount = 1;
        Subscriber subscriber = this.startSubscriber(this.getTestSubscriberBuilder(this.testReceiver));
        Assert.assertEquals((long)expectedChannelCount, (long)this.fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
        subscriber.stopAsync().awaitTerminated();
    }

    @Test
    public void testFailedChannel_recoverableError_channelReopened() throws Exception {
        int expectedChannelCount = 1;
        Subscriber subscriber = this.startSubscriber(this.getTestSubscriberBuilder(this.testReceiver).setSystemExecutorProvider((ExecutorProvider)InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build()));
        this.fakeSubscriberServiceImpl.sendError((Throwable)new StatusException(Status.INTERNAL));
        Assert.assertEquals((long)1L, (long)this.fakeSubscriberServiceImpl.waitForClosedStreams(1));
        Assert.assertEquals((long)expectedChannelCount, (long)this.fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
        subscriber.stopAsync().awaitTerminated();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailedChannel_testTerminated() throws Exception {
        final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        ExecutorProvider provider = new ExecutorProvider(){

            public boolean shouldAutoClose() {
                return true;
            }

            public ScheduledExecutorService getExecutor() {
                return scheduledExecutorService;
            }
        };
        try {
            Subscriber subscriber = this.startSubscriber(this.getTestSubscriberBuilder(this.testReceiver).setSystemExecutorProvider(provider));
            Thread.sleep(100L);
            subscriber.stopAsync().awaitTerminated();
            Assert.assertTrue((boolean)scheduledExecutorService.awaitTermination(10L, TimeUnit.SECONDS));
        }
        finally {
            scheduledExecutorService.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=IllegalStateException.class)
    public void testFailedChannel_fatalError_subscriberFails() throws Exception {
        Subscriber subscriber = this.startSubscriber(this.getTestSubscriberBuilder(this.testReceiver).setSystemExecutorProvider((ExecutorProvider)InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(10).build()));
        this.fakeSubscriberServiceImpl.sendError((Throwable)new StatusException(Status.INVALID_ARGUMENT));
        try {
            subscriber.awaitTerminated();
        }
        finally {
            Assert.assertEquals((Object)ApiService.State.FAILED, (Object)subscriber.state());
            Throwable t = subscriber.failureCause();
            Assert.assertTrue((boolean)(t instanceof ApiException));
            ApiException ex = (ApiException)t;
            Assert.assertTrue((boolean)(ex.getStatusCode() instanceof GrpcStatusCode));
            GrpcStatusCode grpcCode = (GrpcStatusCode)ex.getStatusCode();
            Assert.assertEquals((Object)StatusCode.Code.INVALID_ARGUMENT, (Object)grpcCode.getCode());
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testFailedChannel_shutdownBackgroundResources() throws Exception {
        ExecutorProvider provider = new ExecutorProvider(){

            public boolean shouldAutoClose() {
                return true;
            }

            public ScheduledExecutorService getExecutor() {
                return SubscriberTest.this.fakeExecutor;
            }
        };
        Subscriber subscriber = this.startSubscriber(this.getTestSubscriberBuilder(this.testReceiver).setExecutorProvider(provider));
        this.fakeSubscriberServiceImpl.sendError((Throwable)new StatusException(Status.INVALID_ARGUMENT));
        try {
            subscriber.awaitTerminated();
        }
        finally {
            Assert.assertEquals((Object)ApiService.State.FAILED, (Object)subscriber.state());
            Assert.assertTrue((boolean)this.fakeExecutor.isShutdown());
        }
    }

    @Test
    public void testStreamAckDeadlineIsSetCorrectly() throws Exception {
        int expectedChannelCount = 1;
        int maxDurationPerAckExtension = 5;
        Subscriber subscriber = this.startSubscriber(this.getTestSubscriberBuilder(this.testReceiver).setMaxDurationPerAckExtensionDuration(Duration.ofSeconds(maxDurationPerAckExtension)));
        Assert.assertEquals((long)expectedChannelCount, (long)this.fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
        Assert.assertEquals((long)Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()), (long)this.fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());
        subscriber.stopAsync().awaitTerminated();
        maxDurationPerAckExtension = 700;
        subscriber = this.startSubscriber(this.getTestSubscriberBuilder(this.testReceiver).setMaxDurationPerAckExtensionDuration(Duration.ofSeconds(maxDurationPerAckExtension)));
        Assert.assertEquals((long)expectedChannelCount, (long)this.fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
        Assert.assertEquals((long)Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()), (long)this.fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());
        subscriber.stopAsync().awaitTerminated();
        maxDurationPerAckExtension = 100;
        subscriber = this.startSubscriber(this.getTestSubscriberBuilder(this.testReceiver).setMaxDurationPerAckExtensionDuration(Duration.ofSeconds(maxDurationPerAckExtension)));
        Assert.assertEquals((long)expectedChannelCount, (long)this.fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
        Assert.assertEquals((long)maxDurationPerAckExtension, (long)this.fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());
        subscriber.stopAsync().awaitTerminated();
        subscriber = this.startSubscriber(this.getTestSubscriberBuilder(this.testReceiver));
        Assert.assertEquals((long)expectedChannelCount, (long)this.fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
        Assert.assertEquals((long)Math.toIntExact(Subscriber.STREAM_ACK_DEADLINE_DEFAULT.getSeconds()), (long)this.fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());
        subscriber.stopAsync().awaitTerminated();
    }

    @Test
    public void testPartialFlowControlSettings() throws Exception {
        Subscriber subscriber = this.getTestSubscriberBuilder(this.testReceiver).setFlowControlSettings(Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS.toBuilder().setMaxOutstandingElementCount(Long.valueOf(500L)).build()).build();
        Assert.assertEquals((long)subscriber.getFlowControlSettings().getMaxOutstandingElementCount(), (long)500L);
        Assert.assertEquals((Object)subscriber.getFlowControlSettings().getMaxOutstandingRequestBytes(), (Object)Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes());
        subscriber = this.getTestSubscriberBuilder(this.testReceiver).setFlowControlSettings(Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS.toBuilder().setMaxOutstandingRequestBytes(Long.valueOf(5000000000L)).build()).build();
        Assert.assertEquals((long)subscriber.getFlowControlSettings().getMaxOutstandingRequestBytes(), (long)5000000000L);
        Assert.assertEquals((Object)subscriber.getFlowControlSettings().getMaxOutstandingElementCount(), (Object)Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount());
    }

    private Subscriber startSubscriber(Subscriber.Builder testSubscriberBuilder) {
        Subscriber subscriber = testSubscriberBuilder.build();
        subscriber.startAsync().awaitRunning();
        return subscriber;
    }

    private Subscriber.Builder getTestSubscriberBuilder(MessageReceiver messageReceiver) {
        return Subscriber.newBuilder((ProjectSubscriptionName)TEST_SUBSCRIPTION, (MessageReceiver)messageReceiver).setExecutorProvider((ExecutorProvider)FixedExecutorProvider.create((ScheduledExecutorService)this.fakeExecutor)).setSystemExecutorProvider((ExecutorProvider)FixedExecutorProvider.create((ScheduledExecutorService)this.fakeExecutor)).setChannelProvider((TransportChannelProvider)FixedTransportChannelProvider.create((TransportChannel)GrpcTransportChannel.create((ManagedChannel)this.testChannel))).setCredentialsProvider((CredentialsProvider)NoCredentialsProvider.create()).setClock((ApiClock)this.fakeExecutor.getClock()).setParallelPullCount(1).setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(Long.valueOf(1000L)).build());
    }

    private Subscriber.Builder getTestSubscriberBuilder(MessageReceiverWithAckResponse messageReceiverWithAckResponse) {
        return Subscriber.newBuilder((ProjectSubscriptionName)TEST_SUBSCRIPTION, (MessageReceiverWithAckResponse)messageReceiverWithAckResponse).setExecutorProvider((ExecutorProvider)FixedExecutorProvider.create((ScheduledExecutorService)this.fakeExecutor)).setSystemExecutorProvider((ExecutorProvider)FixedExecutorProvider.create((ScheduledExecutorService)this.fakeExecutor)).setChannelProvider((TransportChannelProvider)FixedTransportChannelProvider.create((TransportChannel)GrpcTransportChannel.create((ManagedChannel)this.testChannel))).setCredentialsProvider((CredentialsProvider)NoCredentialsProvider.create()).setClock((ApiClock)this.fakeExecutor.getClock()).setParallelPullCount(1).setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(Long.valueOf(1000L)).build());
    }
}

