/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.index.messaging;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.kie.kogito.index.event.DomainModelRegisteredEvent;
import org.kie.kogito.index.event.KogitoJobCloudEvent;
import org.kie.kogito.index.event.KogitoProcessCloudEvent;
import org.kie.kogito.index.event.KogitoUserTaskCloudEvent;
import org.kie.kogito.index.json.ProcessInstanceMetaMapper;
import org.kie.kogito.index.json.UserTaskInstanceMetaMapper;
import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;
import org.kie.kogito.index.service.IndexingService;
import org.kie.kogito.index.vertx.ObjectNodeMessageCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class ReactiveMessagingEventConsumer {
    protected static final String KOGITO_DOMAIN_EVENTS = "kogito-domain-events-%s";
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveMessagingEventConsumer.class);
    private static final String KOGITO_PROCESSINSTANCES_EVENTS = "kogito-processinstances-events";
    private static final String KOGITO_PROCESSDOMAIN_EVENTS = "kogito-processdomain-events";
    private static final String KOGITO_USERTASKDOMAIN_EVENTS = "kogito-usertaskdomain-events";
    private static final String KOGITO_USERTASKINSTANCES_EVENTS = "kogito-usertaskinstances-events";
    private static final String KOGITO_JOBS_EVENTS = "kogito-jobs-events";
    @Inject
    IndexingService indexingService;
    @Inject
    EventBus eventBus;
    @Inject
    ObjectNodeMessageCodec codec;
    private Map<String, MessageConsumer<ObjectNode>> consumers = new HashMap<String, MessageConsumer<ObjectNode>>();

    @PostConstruct
    public void setup() {
        this.eventBus.registerDefaultCodec(ObjectNode.class, (MessageCodec)this.codec);
    }

    public void onDomainModelRegisteredEvent(@Observes DomainModelRegisteredEvent event) {
        LOGGER.info("New domain model registered for Process Id: {}", (Object)event.getProcessId());
        this.consumers.computeIfAbsent(event.getProcessId(), f -> {
            MessageConsumer consumer = this.eventBus.consumer(String.format(KOGITO_DOMAIN_EVENTS, event.getProcessId()), e -> this.onDomainEvent((Message<ObjectNode>)e));
            LOGGER.info("Consumer registered for address: {}", (Object)consumer.address());
            return consumer;
        });
    }

    @Incoming(value="kogito-processinstances-events")
    @Acknowledgment(value=Acknowledgment.Strategy.MANUAL)
    public CompletionStage<Void> onProcessInstanceEvent(org.eclipse.microprofile.reactive.messaging.Message<KogitoProcessCloudEvent> event) {
        LOGGER.debug("Process instance consumer received KogitoCloudEvent: \n{}", event);
        return ((CompletableFuture)CompletableFuture.runAsync(() -> this.indexingService.indexProcessInstance((ProcessInstance)((KogitoProcessCloudEvent)event.getPayload()).getData())).thenRun(() -> event.ack())).exceptionally(t -> {
            LOGGER.error("Error processing process instance KogitoCloudEvent: {}", (Object)t.getMessage(), t);
            return null;
        });
    }

    @Incoming(value="kogito-processdomain-events")
    @Acknowledgment(value=Acknowledgment.Strategy.MANUAL)
    public CompletionStage<Void> onProcessInstanceDomainEvent(org.eclipse.microprofile.reactive.messaging.Message<KogitoProcessCloudEvent> event) {
        LOGGER.debug("Process domain consumer received KogitoCloudEvent: \n{}", event);
        return ((CompletableFuture)((CompletableFuture)CompletableFuture.supplyAsync(() -> new ProcessInstanceMetaMapper().apply((KogitoProcessCloudEvent)event.getPayload())).thenCompose(json -> this.sendMessage((ObjectNode)json))).thenRun(() -> event.ack())).exceptionally(t -> {
            LOGGER.error("Error processing process domain KogitoCloudEvent: {}", (Object)t.getMessage(), t);
            return null;
        });
    }

    @Incoming(value="kogito-usertaskinstances-events")
    @Acknowledgment(value=Acknowledgment.Strategy.MANUAL)
    public CompletionStage<Void> onUserTaskInstanceEvent(org.eclipse.microprofile.reactive.messaging.Message<KogitoUserTaskCloudEvent> event) {
        LOGGER.debug("Task instance received KogitoUserTaskCloudEvent \n{}", event);
        return ((CompletableFuture)CompletableFuture.runAsync(() -> this.indexingService.indexUserTaskInstance((UserTaskInstance)((KogitoUserTaskCloudEvent)event.getPayload()).getData())).thenRun(() -> event.ack())).exceptionally(t -> {
            LOGGER.error("Error processing task instance KogitoUserTaskCloudEvent: {}", (Object)t.getMessage(), t);
            return null;
        });
    }

    @Incoming(value="kogito-usertaskdomain-events")
    @Acknowledgment(value=Acknowledgment.Strategy.MANUAL)
    public CompletionStage<Void> onUserTaskInstanceDomainEvent(org.eclipse.microprofile.reactive.messaging.Message<KogitoUserTaskCloudEvent> event) {
        LOGGER.debug("Task domain received KogitoUserTaskCloudEvent \n{}", event);
        return ((CompletableFuture)((CompletableFuture)CompletableFuture.supplyAsync(() -> new UserTaskInstanceMetaMapper().apply((KogitoUserTaskCloudEvent)event.getPayload())).thenCompose(json -> this.sendMessage((ObjectNode)json))).thenRun(() -> event.ack())).exceptionally(t -> {
            LOGGER.error("Error processing task domain KogitoUserTaskCloudEvent: {}", (Object)t.getMessage(), t);
            return null;
        });
    }

    @Incoming(value="kogito-jobs-events")
    @Acknowledgment(value=Acknowledgment.Strategy.MANUAL)
    public CompletionStage<Void> onJobEvent(org.eclipse.microprofile.reactive.messaging.Message<KogitoJobCloudEvent> event) {
        LOGGER.debug("Job received KogitoJobCloudEvent \n{}", event);
        return ((CompletableFuture)CompletableFuture.runAsync(() -> this.indexingService.indexJob((Job)((KogitoJobCloudEvent)event.getPayload()).getData())).thenRun(() -> event.ack())).exceptionally(t -> {
            LOGGER.error("Error processing job KogitoJobCloudEvent: {}", (Object)t.getMessage(), t);
            return null;
        });
    }

    protected CompletableFuture<Void> sendMessage(ObjectNode json) {
        CompletableFuture<Void> cf = new CompletableFuture<Void>();
        String processId = json.get("processId").asText();
        this.eventBus.request(String.format(KOGITO_DOMAIN_EVENTS, processId), (Object)json, async -> {
            if (async.succeeded()) {
                cf.complete(null);
            } else {
                cf.completeExceptionally(async.cause());
            }
        });
        return cf;
    }

    private void onDomainEvent(Message<ObjectNode> message) {
        try {
            LOGGER.debug("Processing domain message: {}", message);
            this.indexingService.indexModel((ObjectNode)message.body());
            message.reply(null);
        }
        catch (Exception ex) {
            LOGGER.error("Error processing domain event: {}", (Object)ex.getMessage(), (Object)ex);
            message.fail(0, ex.getMessage());
        }
    }

    @PreDestroy
    public void destroy() {
        this.consumers.values().forEach(c -> c.unregister());
        this.consumers.clear();
    }
}

