/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.postgresql;

import dk.cloudcreate.essentials.components.foundation.IOExceptionUtil;
import dk.cloudcreate.essentials.components.foundation.postgresql.PostgresqlUtil;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.jdbi.v3.core.ConnectionException;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.statement.Update;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public final class ListenNotify {
    private static final Logger log = LoggerFactory.getLogger(ListenNotify.class);
    public static final String TABLE_NAME = "table_name";
    public static final String SQL_OPERATION = "sql_operation";

    public static String resolveTableChangeChannelName(String tableName) {
        String resolvedChannelName = (String)FailFast.requireNonNull((Object)tableName, (String)"No tableName provided");
        PostgresqlUtil.checkIsValidTableOrColumnName(resolvedChannelName);
        return resolvedChannelName;
    }

    public static void addChangeNotificationTriggerToTable(Handle handle, String tableName, List<SqlOperation> triggerOnSqlOperations, String ... includeAdditionalTableColumnsInNotificationPayload) {
        boolean isReplaceTriggerSupported;
        FailFast.requireNonNull((Object)handle, (String)"No handle provided");
        FailFast.requireNonBlank((CharSequence)tableName, (String)"No tableName provided");
        FailFast.requireNonEmpty(triggerOnSqlOperations, (String)"No triggerOnSqlOperations entries provided");
        PostgresqlUtil.checkIsValidTableOrColumnName(tableName);
        Object additionalColumnsPayLoadStatement = "table_name, sql_operation";
        Object additionalColumnsSelectStatement = "TG_TABLE_NAME, TG_OP";
        if (includeAdditionalTableColumnsInNotificationPayload != null && includeAdditionalTableColumnsInNotificationPayload.length > 0) {
            Arrays.stream(includeAdditionalTableColumnsInNotificationPayload).forEach(PostgresqlUtil::checkIsValidTableOrColumnName);
            additionalColumnsPayLoadStatement = (String)additionalColumnsPayLoadStatement + ", " + Arrays.stream(includeAdditionalTableColumnsInNotificationPayload).reduce((result, column) -> result + ", " + column).get();
            additionalColumnsSelectStatement = (String)additionalColumnsSelectStatement + ", " + Arrays.stream(includeAdditionalTableColumnsInNotificationPayload).map(column -> "NEW." + column).reduce((result, column) -> result + ", " + column).get() + "\n";
        }
        String notifyFunctionSql = MessageFormatter.bind((String)"CREATE OR REPLACE FUNCTION notify_{:tableName}_change()\n       RETURNS trigger AS $$\n       BEGIN\n         PERFORM (\n            WITH payload({:additionalColumnsPayLoadStatement}) AS (\n              SELECT {:additionalColumnsSelectStatement}            )\n            SELECT pg_notify('{:channelName}', row_to_json(payload)::text) FROM payload);\n         RETURN NULL;\n       END;\n       $$ LANGUAGE PLPGSQL;", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)tableName), MessageFormatter.NamedArgumentBinding.arg((String)"channelName", (Object)ListenNotify.resolveTableChangeChannelName(tableName)), MessageFormatter.NamedArgumentBinding.arg((String)"additionalColumnsPayLoadStatement", (Object)additionalColumnsPayLoadStatement), MessageFormatter.NamedArgumentBinding.arg((String)"additionalColumnsSelectStatement", (Object)additionalColumnsSelectStatement)});
        log.debug("Notify Function for changes on '{}' SQL:\n{}", (Object)tableName, (Object)notifyFunctionSql);
        Update update = handle.createUpdate(notifyFunctionSql);
        update.execute();
        log.info("Added Notification FUNCTION 'notify_{}_change' for table '{}'", (Object)tableName, (Object)tableName);
        boolean bl = isReplaceTriggerSupported = PostgresqlUtil.getServiceMajorVersion(handle) >= 14;
        if (!isReplaceTriggerSupported) {
            String dropTriggerSql = MessageFormatter.bind((String)"DROP TRIGGER IF EXISTS notify_on_{:tableName}_changes ON {:tableName} CASCADE", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)tableName)});
            log.debug("Trigger for '{}' changes SQL:\n{}", (Object)tableName, (Object)dropTriggerSql);
            update = handle.createUpdate(dropTriggerSql);
            update.execute();
            log.info("Dropped Notification TRIGGER 'notify_on_{}_changes' for table '{}'", (Object)tableName, (Object)tableName);
        }
        String triggerSql = MessageFormatter.bind((String)"CREATE {:optionalReplace}TRIGGER notify_on_{:tableName}_changes\n      {:when} {:on} ON {:tableName}\n      FOR EACH ROW\n         EXECUTE FUNCTION notify_{:tableName}_change();", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)tableName), MessageFormatter.NamedArgumentBinding.arg((String)"optionalReplace", (Object)(isReplaceTriggerSupported ? "OR REPLACE " : "")), MessageFormatter.NamedArgumentBinding.arg((String)"when", (Object)"AFTER"), MessageFormatter.NamedArgumentBinding.arg((String)"on", (Object)triggerOnSqlOperations.stream().map(Enum::name).reduce((result, on) -> result + " OR " + on).get())});
        log.debug("Trigger for '{}' changes SQL:\n{}", (Object)tableName, (Object)triggerSql);
        update = handle.createUpdate(triggerSql);
        update.execute();
        log.info("Added Notification TRIGGER 'notify_on_{}_changes' for table '{}'", (Object)tableName, (Object)tableName);
    }

    public static void removeChangeNotificationTriggerFromTable(Handle handle, String tableName) {
        FailFast.requireNonNull((Object)handle, (String)"No handle provided");
        FailFast.requireNonBlank((CharSequence)tableName, (String)"No tableName provided");
        PostgresqlUtil.checkIsValidTableOrColumnName(tableName);
        String notifyFunctionSql = MessageFormatter.bind((String)"DROP FUNCTION IF EXISTS notify_{:tableName}_change() CASCADE", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)tableName)});
        Update update = handle.createUpdate(notifyFunctionSql);
        update.execute();
        log.info("Removed Notification FUNCTION 'notify_{}_change' for table '{}'", (Object)tableName, (Object)tableName);
        String triggerSql = MessageFormatter.bind((String)"DROP TRIGGER IF EXISTS notify_on_{:tableName}_changes ON {:tableName} CASCADE", (MessageFormatter.NamedArgumentBinding[])new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg((String)"tableName", (Object)tableName)});
        update = handle.createUpdate(triggerSql);
        update.execute();
        log.info("Removed Notification TRIGGER 'notify_on_{}_changes' for table '{}'", (Object)tableName, (Object)tableName);
    }

    public static Flux<String> listen(Jdbi jdbi, String tableName, Duration pollingInterval) {
        FailFast.requireNonNull((Object)jdbi, (String)"No jdbi provided");
        FailFast.requireNonBlank((CharSequence)tableName, (String)"No tableName provided");
        FailFast.requireNonNull((Object)pollingInterval, (String)"No pollingInterval provided");
        PostgresqlUtil.checkIsValidTableOrColumnName(tableName);
        AtomicReference handleReference = new AtomicReference();
        return Flux.defer(() -> {
            Handle handle = (Handle)handleReference.get();
            try {
                PGConnection connection;
                PGNotification[] notifications;
                if (handle == null) {
                    handle = jdbi.open();
                    handleReference.set(handle);
                    handle.getConnection().setAutoCommit(true);
                    handle.getConnection().setReadOnly(true);
                    handle.getConnection().setTransactionIsolation(2);
                    log.info("Setting up LISTENER for table '{}'", (Object)tableName);
                    handle.execute("LISTEN " + ListenNotify.resolveTableChangeChannelName(tableName), new Object[0]);
                }
                if ((notifications = (connection = handle.getConnection().unwrap(PGConnection.class)).getNotifications()).length > 0) {
                    log.debug("Received {} Notification(s) for table '{}'", (Object)notifications.length, (Object)tableName);
                    return Flux.fromStream(Arrays.stream(notifications).map(PGNotification::getParameter));
                }
                log.trace("Didn't receive any Notifications for table '{}'", (Object)tableName);
                return Flux.empty();
            }
            catch (SQLException | ConnectionException e) {
                if (IOExceptionUtil.isIOException(e)) {
                    log.debug(MessageFormatter.msg((String)"Failed to listen for notification for table '{}'", (Object[])new Object[]{tableName}), e);
                } else {
                    log.error(MessageFormatter.msg((String)"Failed to listen for notification for table '{}'", (Object[])new Object[]{tableName}), e);
                }
                try {
                    if (handle != null) {
                        handle.close();
                    }
                }
                catch (Exception ex) {
                    if (IOExceptionUtil.isIOException(ex)) {
                        log.debug(MessageFormatter.msg((String)"Failed to close the listener Handle for table '{}'", (Object[])new Object[]{tableName}), e);
                        return Flux.empty();
                    }
                    log.error(MessageFormatter.msg((String)"Failed to close the listener Handle for table '{}'", (Object[])new Object[]{tableName}), e);
                    return Flux.empty();
                }
                handleReference.set(null);
                return Flux.empty();
            }
        }).doOnCancel(() -> {
            Handle handle = (Handle)handleReference.get();
            if (handle != null) {
                log.info("Removing LISTENER for table '{}'", (Object)tableName);
                try {
                    handle.execute("UNLISTEN " + ListenNotify.resolveTableChangeChannelName(tableName), new Object[0]);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                try {
                    handle.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }).repeatWhen(longFlux -> Flux.interval((Duration)pollingInterval).onBackpressureDrop().publishOn(Schedulers.newSingle((String)("Postgresql-Listener-" + tableName), (boolean)true)));
    }

    public static enum SqlOperation {
        INSERT,
        UPDATE,
        DELETE,
        TRUNCATE;

    }
}

