/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.index.messaging;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import io.restassured.response.Response;
import io.restassured.response.ValidatableResponse;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.index.GraphQLUtils;
import org.kie.kogito.index.InfinispanServerTestResource;
import org.kie.kogito.index.KafkaTestResource;
import org.kie.kogito.index.TestUtils;
import org.kie.kogito.index.infinispan.protostream.ProtobufService;

@QuarkusTest
@QuarkusTestResource.List(value={@QuarkusTestResource(value=InfinispanServerTestResource.class), @QuarkusTestResource(value=KafkaTestResource.class)})
public class ReactiveMessagingEventConsumerKafkaIT {
    @Inject
    ProtobufService protobufService;
    KafkaProducer<String, String> producer;

    @BeforeEach
    public void setup() {
        String kafka = System.getProperty("kafka.bootstrap.servers", "localhost:9092");
        HashMap<String, String> config = new HashMap<String, String>();
        config.put("bootstrap.servers", kafka);
        config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        config.put("acks", "all");
        this.producer = KafkaProducer.create((Vertx)Vertx.vertx(), config);
    }

    @AfterEach
    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    @Test
    public void testProcessInstanceEvent() throws Exception {
        this.sendProcessInstanceEvent().get(1L, TimeUnit.MINUTES);
        ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{Travels{ id } }\" }").when().post("/graphql", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body("errors[0].message", CoreMatchers.is((Object)"Validation error of type FieldUndefined: Field 'Travels' in type 'Query' is undefined @ 'Travels'"), new Object[0]);
        String processInstanceId = "c2fa5c5e-3002-44c7-aef7-bce82297e3fe";
        ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().contentType(ContentType.JSON).body(GraphQLUtils.getProcessInstanceById(processInstanceId)).when().post("/graphql", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body("data.ProcessInstances[0].id", CoreMatchers.is((Object)processInstanceId), new Object[0]);
        this.protobufService.registerProtoBufferType(TestUtils.getTravelsProtoBufferFile());
        ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{Travels{ id } }\" }").when().post("/graphql", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body("data.Travels", Matchers.isA(Collection.class), new Object[0]);
        this.sendProcessInstanceEvent().get(1L, TimeUnit.MINUTES);
        ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{ProcessInstances{ id } }\" }").when().post("/graphql", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body("data.ProcessInstances.size()", CoreMatchers.is((Object)1), new Object[0])).body("data.ProcessInstances[0].id", CoreMatchers.is((Object)processInstanceId), new Object[0]);
        ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().contentType(ContentType.JSON).body("{ \"query\" : \"{Travels{ id } }\" }").when().post("/graphql", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body("data.Travels[0].id", CoreMatchers.is((Object)"f8868a2e-1bbb-47eb-93cf-fa46ff9dbfee"), new Object[0]);
        ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().when().get("/metrics", new Object[0])).then()).log().ifValidationFails()).statusCode(200)).body(Matchers.containsString((String)"application_mp_messaging_message_count_total{channel=\"kogito-processdomain-events\"} 2.0"), new Matcher[]{Matchers.containsString((String)"application_mp_messaging_message_count_total{channel=\"kogito-processinstances-events\"} 2.0")});
    }

    private CompletableFuture<Void> sendProcessInstanceEvent() throws Exception {
        String json = TestUtils.readFileContent("process_instance_event.json");
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.producer.write(KafkaProducerRecord.create((String)"kogito-processinstances-events", (Object)json), event -> {
            if (event.succeeded()) {
                future.complete(null);
            } else {
                future.completeExceptionally(event.cause());
            }
        });
        return future;
    }
}

