/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.postgresql;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dk.cloudcreate.essentials.components.foundation.postgresql.ListenNotify;
import dk.cloudcreate.essentials.components.foundation.postgresql.TableChangeNotification;
import dk.cloudcreate.essentials.reactive.EventBus;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.jdbi.v3.core.ConnectionException;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiTableChangeListener<T extends TableChangeNotification>
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(MultiTableChangeListener.class);
    private final Jdbi jdbi;
    private final Duration pollingInterval;
    private final ObjectMapper objectMapper;
    private final EventBus eventBus;
    private final ConcurrentMap<String, Class<? extends T>> listenForNotificationsRelatedToTables;
    private final AtomicReference<Handle> handleReference;
    private ScheduledExecutorService executorService;
    private ScheduledFuture<?> scheduledFuture;

    public MultiTableChangeListener(Jdbi jdbi, Duration pollingInterval, ObjectMapper objectMapper, EventBus eventBus) {
        this.jdbi = (Jdbi)FailFast.requireNonNull((Object)jdbi, (String)"No jdbi provided");
        this.pollingInterval = (Duration)FailFast.requireNonNull((Object)pollingInterval, (String)"No pollingInterval provided");
        this.objectMapper = (ObjectMapper)FailFast.requireNonNull((Object)objectMapper, (String)"No objectMapper provided");
        this.eventBus = (EventBus)FailFast.requireNonNull((Object)eventBus, (String)"No localEventBus instance provided");
        this.listenForNotificationsRelatedToTables = new ConcurrentHashMap<String, Class<? extends T>>();
        this.handleReference = new AtomicReference();
        this.executorService = Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder.builder().nameFormat("MultiTableChangeListener").daemon(true).build());
        this.scheduledFuture = this.executorService.scheduleAtFixedRate(this::pollForNotifications, pollingInterval.toMillis(), pollingInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() {
        log.info("Closing");
        try {
            this.scheduledFuture.cancel(true);
            for (String tableName : this.listenForNotificationsRelatedToTables.keySet()) {
                this.unlisten(tableName);
            }
            this.scheduledFuture = null;
        }
        catch (Exception exception) {
            // empty catch block
        }
        log.info("Closed");
    }

    public EventBus getEventBus() {
        return this.eventBus;
    }

    public MultiTableChangeListener listenToNotificationsFor(String tableName, Class<? extends T> tableNotificationType) {
        FailFast.requireNonBlank((CharSequence)tableName, (String)"No tableName was provided");
        FailFast.requireNonNull(tableNotificationType, (String)"No tableNotificationType was provided");
        if (this.listenForNotificationsRelatedToTables.put(tableName, tableNotificationType) == null) {
            this.listen(tableName);
        }
        return this;
    }

    public MultiTableChangeListener unlistenToNotificationsFor(String tableName) {
        FailFast.requireNonBlank((CharSequence)tableName, (String)"No tableName was provided");
        if (this.listenForNotificationsRelatedToTables.remove(tableName) != null) {
            this.unlisten(tableName);
        }
        return this;
    }

    private void listen(String tableName) {
        FailFast.requireNonBlank((CharSequence)tableName, (String)"No tableName provided");
        log.info("Setting up Table change LISTENER for '{}'", (Object)tableName);
        this.getHandle(null).execute("LISTEN " + ListenNotify.resolveTableChangeChannelName(tableName), new Object[0]);
    }

    private void unlisten(String tableName) {
        FailFast.requireNonBlank((CharSequence)tableName, (String)"No tableName provided");
        log.info("Removing table change LISTENER for '{}'", (Object)tableName);
        this.getHandle(null).execute("UNLISTEN " + ListenNotify.resolveTableChangeChannelName(tableName), new Object[0]);
    }

    private void pollForNotifications() {
        log.trace("Polling for notifications related to {} tables: {}", (Object)this.listenForNotificationsRelatedToTables.size(), this.listenForNotificationsRelatedToTables.keySet());
        if (this.listenForNotificationsRelatedToTables.isEmpty()) {
            return;
        }
        Handle handle = this.getHandle(handleCreated -> {
            for (String tableName : this.listenForNotificationsRelatedToTables.keySet()) {
                this.listen(tableName);
            }
        });
        try {
            PGConnection connection = handle.getConnection().unwrap(PGConnection.class);
            PGNotification[] notifications = connection.getNotifications();
            if (notifications.length > 0) {
                log.debug("Received {} Notification(s)", (Object)notifications.length);
                Arrays.stream(notifications).map(notification -> {
                    Class payloadType = (Class)this.listenForNotificationsRelatedToTables.get(notification.getName());
                    if (payloadType == null) {
                        log.error(MessageFormatter.msg((String)"Couldn't find a concrete {} type for notifications related to table '{}'", (Object[])new Object[]{TableChangeNotification.class.getSimpleName(), notification.getName()}));
                        return null;
                    }
                    try {
                        return (TableChangeNotification)this.objectMapper.readValue(notification.getParameter(), payloadType);
                    }
                    catch (JsonProcessingException e) {
                        log.error(MessageFormatter.msg((String)"Failed to deserialize notification payload '{}' to concrete {} related to table '{}'", (Object[])new Object[]{notification.getParameter(), payloadType.getName(), notification.getName()}), (Throwable)e);
                        return null;
                    }
                }).filter(Objects::nonNull).forEach(arg_0 -> ((EventBus)this.eventBus).publish(arg_0));
            } else {
                log.trace("Didn't receive any Notifications");
            }
        }
        catch (SQLException | ConnectionException e) {
            log.error(MessageFormatter.msg((String)"Failed to listen for notifications", (Object[])new Object[0]), e);
            try {
                handle.close();
            }
            catch (Exception ex) {
                log.error(MessageFormatter.msg((String)"Failed to close the listener Handle", (Object[])new Object[0]), e);
            }
            this.handleReference.set(null);
        }
    }

    private Handle getHandle(Consumer<Handle> onHandleCreated) {
        try {
            Handle handle = this.handleReference.get();
            if (handle == null) {
                handle = this.jdbi.open();
                this.handleReference.set(handle);
                handle.getConnection().setAutoCommit(true);
                handle.getConnection().setReadOnly(true);
                handle.getConnection().setTransactionIsolation(2);
                if (onHandleCreated != null) {
                    onHandleCreated.accept(handle);
                }
            }
            return handle;
        }
        catch (SQLException e) {
            throw new RuntimeException("Failed to acquire Handle", e);
        }
    }
}

