/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.index.graphql;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import io.restassured.response.Response;
import io.restassured.response.ValidatableResponse;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.handler.graphql.ApolloWSMessageType;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import net.javacrumbs.jsonunit.assertj.JsonAssertion;
import net.javacrumbs.jsonunit.assertj.JsonAssertions;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Test;
import org.kie.kogito.index.InfinispanServerTestResource;
import org.kie.kogito.index.TestUtils;
import org.kie.kogito.index.event.KogitoProcessCloudEvent;
import org.kie.kogito.index.event.KogitoUserTaskCloudEvent;
import org.kie.kogito.index.graphql.MockGraphQLInstrumentation;
import org.kie.kogito.index.infinispan.protostream.ProtobufService;
import org.kie.kogito.index.messaging.ReactiveMessagingEventConsumer;
import org.kie.kogito.index.model.ProcessInstanceState;

@QuarkusTest
@QuarkusTestResource(value=InfinispanServerTestResource.class)
public class WebSocketSubscriptionTest {
    @Inject
    ReactiveMessagingEventConsumer consumer;
    @Inject
    ProtobufService protobufService;
    @Inject
    MockGraphQLInstrumentation instrumentation;
    @Inject
    Vertx vertx;
    private AtomicInteger counter = new AtomicInteger(0);

    @Test
    public void testProcessInstanceSubscription() throws Exception {
        String processId = "travels";
        String processInstanceId = UUID.randomUUID().toString();
        this.protobufService.registerProtoBufferType(TestUtils.getTravelsProtoBufferFile());
        this.assertProcessInstanceSubscription(processId, processInstanceId, ProcessInstanceState.ACTIVE, "subscription { ProcessInstanceAdded { id, processId, state } }", "ProcessInstanceAdded");
        this.assertProcessInstanceSubscription(processId, processInstanceId, ProcessInstanceState.COMPLETED, "subscription { ProcessInstanceUpdated { id, processId, state } }", "ProcessInstanceUpdated");
    }

    @Test
    public void testUserTaskInstanceSubscription() throws Exception {
        String taskId = UUID.randomUUID().toString();
        String processId = "deals";
        String processInstanceId = UUID.randomUUID().toString();
        this.protobufService.registerProtoBufferType(TestUtils.getDealsProtoBufferFile());
        this.assertUserTaskInstanceSubscription(taskId, processId, processInstanceId, "InProgress", "subscription { UserTaskInstanceAdded { id, processInstanceId, processId, state } }", "UserTaskInstanceAdded");
        this.assertUserTaskInstanceSubscription(taskId, processId, processInstanceId, "Completed", "subscription { UserTaskInstanceUpdated { id, processInstanceId, processId, state } }", "UserTaskInstanceUpdated");
    }

    @Test
    public void testDomainSubscription() throws Exception {
        String processId = "travels";
        String processInstanceId = UUID.randomUUID().toString();
        this.protobufService.registerProtoBufferType(TestUtils.getTravelsProtoBufferFile());
        this.assertDomainSubscription(processId, processInstanceId, ProcessInstanceState.ACTIVE, "subscription { TravelsAdded { id, traveller { firstName }, processInstances { state } } }", "TravelsAdded");
        this.assertDomainSubscription(processId, processInstanceId, ProcessInstanceState.COMPLETED, "subscription { TravelsUpdated { id, traveller { firstName }, processInstances { state } } }", "TravelsUpdated");
    }

    private void assertDomainSubscription(String processId, String processInstanceId, ProcessInstanceState state, String subscription, String subscriptionName) throws Exception {
        CompletableFuture<JsonObject> cf = this.subscribe(subscription);
        ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ Travels{ id } }\" }").when().post("/graphql", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body("data.Travels", CoreMatchers.isA(Collection.class), new Object[0]);
        KogitoProcessCloudEvent event = TestUtils.getProcessCloudEvent(processId, processInstanceId, state, null, null, null);
        this.consumer.onProcessInstanceDomainEvent(event);
        JsonObject json = cf.get(1L, TimeUnit.MINUTES);
        JsonAssertions.assertThatJson((Object)json.toString()).and(new JsonAssertion[]{a -> a.node("type").isEqualTo((Object)"data"), a -> a.node("payload.data." + subscriptionName + ".id").isEqualTo((Object)processInstanceId), a -> a.node("payload.data." + subscriptionName + ".processInstances[0].state").isEqualTo((Object)state.name()), a -> a.node("payload.data." + subscriptionName + ".traveller.firstName").isEqualTo((Object)"Maciej")});
    }

    private void assertProcessInstanceSubscription(String processId, String processInstanceId, ProcessInstanceState state, String subscription, String subscriptionName) throws Exception {
        CompletableFuture<JsonObject> cf = this.subscribe(subscription);
        ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ Travels{ id } }\" }").when().post("/graphql", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body("data.Travels", CoreMatchers.isA(Collection.class), new Object[0]);
        KogitoProcessCloudEvent event = TestUtils.getProcessCloudEvent(processId, processInstanceId, state, null, null, null);
        this.consumer.onProcessInstanceEvent(event);
        JsonObject json = cf.get(1L, TimeUnit.MINUTES);
        JsonAssertions.assertThatJson((Object)json.toString()).and(new JsonAssertion[]{a -> a.node("type").isEqualTo((Object)"data"), a -> a.node("payload.data." + subscriptionName + ".id").isEqualTo((Object)processInstanceId), a -> a.node("payload.data." + subscriptionName + ".processId").isEqualTo((Object)processId), a -> a.node("payload.data." + subscriptionName + ".state").isEqualTo((Object)state.name())});
    }

    private void assertUserTaskInstanceSubscription(String taskId, String processId, String processInstanceId, String state, String subscription, String subscriptionName) throws Exception {
        CompletableFuture<JsonObject> cf = this.subscribe(subscription);
        ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ Deals{ id } }\" }").when().post("/graphql", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body("data.Deals", CoreMatchers.isA(Collection.class), new Object[0]);
        KogitoUserTaskCloudEvent event = TestUtils.getUserTaskCloudEvent(taskId, processId, processInstanceId, null, null, state);
        this.consumer.onUserTaskInstanceEvent(event);
        JsonObject json = cf.get(1L, TimeUnit.MINUTES);
        JsonAssertions.assertThatJson((Object)json.toString()).and(new JsonAssertion[]{a -> a.node("type").isEqualTo((Object)"data"), a -> a.node("payload.data." + subscriptionName + ".id").isEqualTo((Object)taskId), a -> a.node("payload.data." + subscriptionName + ".processInstanceId").isEqualTo((Object)processInstanceId), a -> a.node("payload.data." + subscriptionName + ".processId").isEqualTo((Object)processId), a -> a.node("payload.data." + subscriptionName + ".state").isEqualTo((Object)state)});
    }

    private CompletableFuture<JsonObject> subscribe(String subscription) throws Exception {
        HttpClient httpClient = this.vertx.createHttpClient(new HttpClientOptions().setDefaultPort(TestUtils.getPortFromConfig()));
        CompletableFuture<JsonObject> cf = new CompletableFuture<JsonObject>();
        CompletableFuture<Void> wsFuture = new CompletableFuture<Void>();
        this.instrumentation.setFuture(wsFuture);
        httpClient.webSocket("/graphql", websocketRes -> {
            if (websocketRes.succeeded()) {
                WebSocket webSocket = (WebSocket)websocketRes.result();
                webSocket.handler(message -> {
                    JsonObject json = message.toJsonObject();
                    String type = json.getString("type");
                    if (ApolloWSMessageType.COMPLETE.getText().equals(type)) {
                        cf.complete(null);
                    } else if (ApolloWSMessageType.DATA.getText().equals(type)) {
                        cf.complete(message.toJsonObject());
                    } else {
                        cf.completeExceptionally(new RuntimeException(String.format("Unexpected message type: %s\nMessage: %s", type, message.toString())));
                    }
                });
                JsonObject init = new JsonObject().put("id", String.valueOf(this.counter.getAndIncrement())).put("type", ApolloWSMessageType.START.getText()).put("payload", new JsonObject().put("query", subscription));
                webSocket.write(init.toBuffer());
            } else {
                websocketRes.cause().printStackTrace();
                wsFuture.completeExceptionally(websocketRes.cause());
            }
        });
        cf.whenComplete((r, t) -> httpClient.close());
        wsFuture.get(1L, TimeUnit.MINUTES);
        return cf;
    }
}

