/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.repository.postgresql;

import io.smallrye.mutiny.Multi;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.RowSet;
import io.vertx.mutiny.sqlclient.Tuple;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
import org.kie.kogito.jobs.service.repository.marshaller.RecipientMarshaller;
import org.kie.kogito.jobs.service.repository.marshaller.TriggerMarshaller;
import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.timer.Trigger;
import org.reactivestreams.Publisher;

@ApplicationScoped
public class PostgreSqlJobRepository
extends BaseReactiveJobRepository
implements ReactiveJobRepository {
    private static final String JOB_DETAILS_TABLE = "job_details";
    private static final String JOB_DETAILS_COLUMNS = "id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created";
    private PgPool client;
    private final TriggerMarshaller triggerMarshaller;
    private final RecipientMarshaller recipientMarshaller;

    PostgreSqlJobRepository() {
        this(null, null, null, null, null);
    }

    @Inject
    public PostgreSqlJobRepository(Vertx vertx, JobEventPublisher jobEventPublisher, PgPool client, TriggerMarshaller triggerMarshaller, RecipientMarshaller recipientMarshaller) {
        super(vertx, jobEventPublisher);
        this.client = client;
        this.triggerMarshaller = triggerMarshaller;
        this.recipientMarshaller = recipientMarshaller;
    }

    public CompletionStage<JobDetails> doSave(JobDetails job) {
        return this.client.preparedQuery("INSERT INTO job_details (id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created) VALUES ($1, $2, $3, now(), $4, $5, $6, $7, $8, $9, $10, $11, $12, now()) ON CONFLICT (id) DO UPDATE SET correlation_id = $2, status = $3, last_update = now(), retries = $4, execution_counter = $5, scheduled_id = $6, priority = $7, recipient = $8, trigger = $9, fire_time = $10, execution_timeout = $11, execution_timeout_unit = $12 RETURNING id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created").execute(Tuple.tuple(Stream.of(job.getId(), job.getCorrelationId(), Optional.ofNullable(job.getStatus()).map(Enum::name).orElse(null), job.getRetries(), job.getExecutionCounter(), job.getScheduledId(), job.getPriority(), this.recipientMarshaller.marshall(job.getRecipient()), this.triggerMarshaller.marshall(job.getTrigger()), Optional.ofNullable(job.getTrigger()).map(Trigger::hasNextFireTime).map(DateUtil::dateToOffsetDateTime).orElse(null), job.getExecutionTimeout(), Optional.ofNullable(job.getExecutionTimeoutUnit()).map(Enum::name).orElse(null)).collect(Collectors.toList()))).onItem().transform(RowSet::iterator).onItem().transform(iterator -> iterator.hasNext() ? this.from((Row)iterator.next()) : null).convert().toCompletableFuture();
    }

    public CompletionStage<JobDetails> get(String id) {
        return this.client.preparedQuery("SELECT id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created FROM job_details WHERE id = $1").execute(Tuple.of((Object)id)).onItem().transform(RowSet::iterator).onItem().transform(iterator -> iterator.hasNext() ? this.from((Row)iterator.next()) : null).convert().toCompletableFuture();
    }

    public CompletionStage<Boolean> exists(String id) {
        return this.client.preparedQuery("SELECT id FROM job_details WHERE id = $1").execute(Tuple.of((Object)id)).onItem().transform(rowSet -> rowSet.rowCount() > 0).convert().toCompletableFuture();
    }

    public CompletionStage<JobDetails> delete(String id) {
        return this.client.preparedQuery("DELETE FROM job_details WHERE id = $1 RETURNING id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created").execute(Tuple.of((Object)id)).onItem().transform(RowSet::iterator).onItem().transform(iterator -> iterator.hasNext() ? this.from((Row)iterator.next()) : null).convert().toCompletableFuture();
    }

    public PublisherBuilder<JobDetails> findByStatusBetweenDates(ZonedDateTime fromFireTime, ZonedDateTime toFireTime, JobStatus[] status, ReactiveJobRepository.SortTerm[] orderBy) {
        String statusFilter = status != null && status.length > 0 ? PostgreSqlJobRepository.createStatusFilter(status) : null;
        String fireTimeFilter = PostgreSqlJobRepository.createFireTimeFilter("$1", "$2");
        String orderByCriteria = orderBy != null && orderBy.length > 0 ? PostgreSqlJobRepository.createOrderBy(orderBy) : "";
        StringBuilder queryFilter = new StringBuilder();
        if (statusFilter != null) {
            queryFilter.append(statusFilter);
            queryFilter.append(" AND ");
        }
        queryFilter.append(fireTimeFilter);
        String findQuery = "SELECT id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created FROM job_details WHERE " + queryFilter + " " + orderByCriteria;
        Tuple params = Tuple.of((Object)fromFireTime.toOffsetDateTime(), (Object)toFireTime.toOffsetDateTime());
        return ReactiveStreams.fromPublisher((Publisher)AdaptersToReactiveStreams.publisher((Flow.Publisher)this.client.preparedQuery(findQuery).execute(params).onItem().transformToMulti(rowSet -> Multi.createFrom().iterable((Iterable)rowSet)).onItem().transform(this::from)));
    }

    static String createStatusFilter(JobStatus ... status) {
        return Arrays.stream(status).map(Enum::name).collect(Collectors.joining("', '", "status IN ('", "')"));
    }

    static String createFireTimeFilter(String indexFrom, String indexTo) {
        return String.format("fire_time BETWEEN %s AND %s", indexFrom, indexTo);
    }

    static String createOrderBy(ReactiveJobRepository.SortTerm[] sortTerms) {
        return Stream.of(sortTerms).map(PostgreSqlJobRepository::createOrderByTerm).collect(Collectors.joining(", ", "ORDER BY ", ""));
    }

    static String createOrderByTerm(ReactiveJobRepository.SortTerm sortTerm) {
        return PostgreSqlJobRepository.toColumName(sortTerm.getField()) + (sortTerm.isAsc() ? " ASC" : " DESC");
    }

    static String toColumName(ReactiveJobRepository.SortTermField field) {
        return switch (field) {
            case ReactiveJobRepository.SortTermField.FIRE_TIME -> "fire_time";
            case ReactiveJobRepository.SortTermField.CREATED -> "created";
            case ReactiveJobRepository.SortTermField.ID -> "id";
            default -> throw new IllegalArgumentException("No colum name is defined for field: " + field);
        };
    }

    JobDetails from(Row row) {
        return JobDetails.builder().id(row.getString("id")).correlationId(row.getString("correlation_id")).status((JobStatus)Optional.ofNullable(row.getString("status")).map(JobStatus::valueOf).orElse(null)).lastUpdate((ZonedDateTime)Optional.ofNullable(row.getOffsetDateTime("last_update")).map(t -> t.atZoneSameInstant(DateUtil.DEFAULT_ZONE)).orElse(null)).retries(row.getInteger("retries")).executionCounter(row.getInteger("execution_counter")).scheduledId(row.getString("scheduled_id")).priority(row.getInteger("priority")).recipient(this.recipientMarshaller.unmarshall((JsonObject)row.get(JsonObject.class, "recipient"))).trigger(this.triggerMarshaller.unmarshall((JsonObject)row.get(JsonObject.class, "trigger"))).executionTimeout(row.getLong("execution_timeout")).executionTimeoutUnit((ChronoUnit)Optional.ofNullable(row.getString("execution_timeout_unit")).map(ChronoUnit::valueOf).orElse(null)).created((ZonedDateTime)Optional.ofNullable(row.getOffsetDateTime("created")).map(t -> t.atZoneSameInstant(DateUtil.DEFAULT_ZONE)).orElse(null)).build();
    }
}

