/*
 * Decompiled with CFR 0.152.
 */
package dk.cloudcreate.essentials.components.foundation.postgresql;

import com.fasterxml.jackson.databind.ObjectMapper;
import dk.cloudcreate.essentials.components.foundation.IOExceptionUtil;
import dk.cloudcreate.essentials.components.foundation.json.JSONSerializer;
import dk.cloudcreate.essentials.components.foundation.json.JacksonJSONSerializer;
import dk.cloudcreate.essentials.components.foundation.postgresql.ListenNotify;
import dk.cloudcreate.essentials.components.foundation.postgresql.NotificationDuplicationFilter;
import dk.cloudcreate.essentials.components.foundation.postgresql.NotificationFilterChain;
import dk.cloudcreate.essentials.components.foundation.postgresql.PostgresqlUtil;
import dk.cloudcreate.essentials.components.foundation.postgresql.TableChangeNotification;
import dk.cloudcreate.essentials.reactive.EventBus;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.Lifecycle;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MultiTableChangeListener<T extends TableChangeNotification>
implements Lifecycle,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(MultiTableChangeListener.class);
    private final Jdbi jdbi;
    private final Duration pollingInterval;
    private final JSONSerializer jsonSerializer;
    private final EventBus eventBus;
    private final ConcurrentMap<String, Class<? extends T>> listenForNotificationsRelatedToTables;
    private final AtomicReference<Handle> handleReference;
    private ScheduledExecutorService executorService;
    private ScheduledFuture<?> scheduledFuture;
    private final boolean filterDuplicateNotifications;
    private final NotificationFilterChain notificationFilterChain;
    private volatile boolean started;

    public MultiTableChangeListener(Jdbi jdbi, Duration pollingInterval, JSONSerializer jsonSerializer, EventBus eventBus, boolean filterDuplicateNotifications) {
        this.jdbi = (Jdbi)FailFast.requireNonNull((Object)jdbi, (String)"No jdbi provided");
        this.pollingInterval = (Duration)FailFast.requireNonNull((Object)pollingInterval, (String)"No pollingInterval provided");
        this.jsonSerializer = (JSONSerializer)FailFast.requireNonNull((Object)jsonSerializer, (String)"No jsonSerializer provided");
        this.eventBus = (EventBus)FailFast.requireNonNull((Object)eventBus, (String)"No localEventBus instance provided");
        this.filterDuplicateNotifications = filterDuplicateNotifications;
        if (jsonSerializer instanceof JacksonJSONSerializer) {
            JacksonJSONSerializer jacksonJSONSerializer = (JacksonJSONSerializer)jsonSerializer;
            this.notificationFilterChain = new NotificationFilterChain(jacksonJSONSerializer.getObjectMapper());
        } else {
            this.notificationFilterChain = new NotificationFilterChain(new ObjectMapper());
        }
        this.listenForNotificationsRelatedToTables = new ConcurrentHashMap<String, Class<? extends T>>();
        this.handleReference = new AtomicReference();
        this.executorService = Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder.builder().nameFormat("MultiTableChangeListener").daemon(true).build());
        this.scheduledFuture = this.executorService.scheduleAtFixedRate(this::pollForNotifications, pollingInterval.toMillis(), pollingInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    public boolean isFilterDuplicateNotifications() {
        return this.filterDuplicateNotifications;
    }

    public EventBus getEventBus() {
        return this.eventBus;
    }

    public void removeDuplicationFilter(NotificationDuplicationFilter filter) {
        this.notificationFilterChain.removeFilter(filter);
    }

    public boolean addDuplicationFilterAsFirst(NotificationDuplicationFilter filter) {
        return this.notificationFilterChain.addFilterAsFirst(filter);
    }

    public boolean addDuplicationFilterAsLast(NotificationDuplicationFilter filter) {
        return this.notificationFilterChain.addFilterAsLast(filter);
    }

    public MultiTableChangeListener listenToNotificationsFor(String tableName, Class<? extends T> tableNotificationType) {
        FailFast.requireNonBlank((CharSequence)tableName, (String)"No tableName was provided");
        FailFast.requireNonNull(tableNotificationType, (String)"No tableNotificationType was provided");
        PostgresqlUtil.checkIsValidTableOrColumnName(tableName);
        if (this.listenForNotificationsRelatedToTables.put(tableName, tableNotificationType) == null) {
            this.listen(tableName);
        }
        return this;
    }

    public MultiTableChangeListener unlistenToNotificationsFor(String tableName) {
        FailFast.requireNonBlank((CharSequence)tableName, (String)"No tableName was provided");
        PostgresqlUtil.checkIsValidTableOrColumnName(tableName);
        if (this.listenForNotificationsRelatedToTables.remove(tableName) != null) {
            this.unlisten(tableName);
        }
        return this;
    }

    private void listen(String tableName) {
        FailFast.requireNonBlank((CharSequence)tableName, (String)"No tableName provided");
        log.info("Setting up Table change LISTENER for '{}'", (Object)tableName);
        PostgresqlUtil.checkIsValidTableOrColumnName(tableName);
        try {
            this.getHandle(null).execute("LISTEN " + ListenNotify.resolveTableChangeChannelName(tableName), new Object[0]);
        }
        catch (Exception e) {
            if (IOExceptionUtil.isIOException(e)) {
                log.debug("Failed to add change listener for table '{}'", (Object)tableName, (Object)e);
            }
            throw new RuntimeException(MessageFormatter.msg((String)"Failed to add change listener for table '{}'", (Object[])new Object[]{tableName}), e);
        }
    }

    private void unlisten(String tableName) {
        FailFast.requireNonBlank((CharSequence)tableName, (String)"No tableName provided");
        log.info("Removing table change LISTENER for '{}'", (Object)tableName);
        PostgresqlUtil.checkIsValidTableOrColumnName(tableName);
        try {
            this.getHandle(null).execute("UNLISTEN " + ListenNotify.resolveTableChangeChannelName(tableName), new Object[0]);
        }
        catch (Exception e) {
            if (IOExceptionUtil.isIOException(e)) {
                log.debug("Failed to unlisten table '{}'", (Object)tableName, (Object)e);
            }
            throw new RuntimeException(MessageFormatter.msg((String)"Failed to unlisten table '{}'", (Object[])new Object[]{tableName}), e);
        }
    }

    private Stream<PGNotification> filterDuplicateNotifications(PGNotification[] notifications) {
        if (this.filterDuplicateNotifications && notifications.length > 1 && this.notificationFilterChain.hasDuplicationFilters()) {
            HashSet<String> duplicationKeys = new HashSet<String>();
            ArrayList<PGNotification> filteredNotifications = new ArrayList<PGNotification>();
            for (PGNotification notification : notifications) {
                String parameterJson = notification.getParameter();
                Optional<String> duplicationKey = this.notificationFilterChain.extractDuplicationKey(parameterJson);
                if (!duplicationKey.isEmpty() && !duplicationKeys.add(duplicationKey.get())) continue;
                filteredNotifications.add(notification);
            }
            int originalCount = notifications.length;
            int filteredCount = filteredNotifications.size();
            int reducedCount = originalCount - filteredCount;
            log.debug("Reduced notifications from {} to {} (reduction by {}). UniqueNames: {}", new Object[]{originalCount, filteredCount, reducedCount, duplicationKeys});
            return filteredNotifications.stream();
        }
        return Arrays.stream(notifications);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pollForNotifications() {
        log.trace("Polling for notifications related to {} tables: {}", (Object)this.listenForNotificationsRelatedToTables.size(), this.listenForNotificationsRelatedToTables.keySet());
        if (this.scheduledFuture == null || this.scheduledFuture.isCancelled() || this.scheduledFuture.isDone()) {
            log.trace("Skipping polling since scheduledFuture is missing, cancelled or done");
            return;
        }
        if (this.listenForNotificationsRelatedToTables.isEmpty()) {
            return;
        }
        Handle handle = null;
        try {
            handle = this.getHandle(handleCreated -> {
                for (String tableName : this.listenForNotificationsRelatedToTables.keySet()) {
                    try {
                        this.listen(tableName);
                    }
                    catch (Exception e) {
                        log.error("Failed to add change listener for table '{}' during creation of new Handle", (Object)tableName, (Object)e);
                    }
                }
            });
            PGConnection connection = handle.getConnection().unwrap(PGConnection.class);
            PGNotification[] notifications = connection.getNotifications();
            if (notifications.length > 0) {
                AtomicLong notificationsPublished = new AtomicLong();
                if (log.isDebugEnabled()) {
                    log.debug("Received '{}' Notification(s) for tables: {}", (Object)notifications.length, Arrays.stream(notifications).map(PGNotification::getName).collect(Collectors.toSet()));
                }
                this.filterDuplicateNotifications(notifications).map(notification -> {
                    Class payloadType = (Class)this.listenForNotificationsRelatedToTables.get(notification.getName());
                    if (payloadType == null) {
                        log.error(MessageFormatter.msg((String)"Couldn't find a concrete {} type for notifications related to table '{}'", (Object[])new Object[]{TableChangeNotification.class.getSimpleName(), notification.getName()}));
                        return null;
                    }
                    try {
                        return (TableChangeNotification)this.jsonSerializer.deserialize(notification.getParameter(), payloadType);
                    }
                    catch (Throwable e) {
                        Exceptions.rethrowIfCriticalError((Throwable)e);
                        log.error(MessageFormatter.msg((String)"Failed to deserialize notification payload '{}' to concrete {} related to table '{}'", (Object[])new Object[]{notification.getParameter(), payloadType.getName(), notification.getName()}), e);
                        return null;
                    }
                }).filter(Objects::nonNull).forEach(event -> {
                    notificationsPublished.getAndIncrement();
                    this.eventBus.publish(event);
                });
                log.debug("Finished publishing '{}' Notification(s)", (Object)notificationsPublished.get());
            } else {
                log.trace("Didn't receive any Notifications");
            }
        }
        catch (Exception e) {
            if (IOExceptionUtil.isIOException(e)) {
                log.debug(MessageFormatter.msg((String)"Failed while polling for notifications", (Object[])new Object[0]), (Throwable)e);
            } else {
                log.error(MessageFormatter.msg((String)"Failed while polling for notifications", (Object[])new Object[0]), (Throwable)e);
            }
            try {
                if (handle != null) {
                    handle.close();
                }
            }
            catch (Exception ex) {
                if (IOExceptionUtil.isIOException(e)) {
                    log.debug(MessageFormatter.msg((String)"Failed to close the polling listener Handle", (Object[])new Object[0]), (Throwable)e);
                } else {
                    log.error(MessageFormatter.msg((String)"Failed to close the polling listener Handle", (Object[])new Object[0]), (Throwable)e);
                }
            }
            finally {
                this.handleReference.set(null);
            }
        }
    }

    private synchronized Handle getHandle(Consumer<Handle> onHandleCreated) {
        try {
            Handle handle = this.handleReference.get();
            if (handle == null) {
                handle = this.jdbi.open();
                this.handleReference.set(handle);
                handle.getConnection().setAutoCommit(true);
                handle.getConnection().setReadOnly(true);
                handle.getConnection().setTransactionIsolation(2);
                if (onHandleCreated != null) {
                    onHandleCreated.accept(handle);
                }
            }
            return handle;
        }
        catch (SQLException e) {
            throw new RuntimeException("Failed to acquire a Handle", e);
        }
    }

    public void start() {
        if (!this.started) {
            log.info("Starting MultiTableChangeListener");
            this.started = true;
        }
    }

    public void stop() {
        if (this.started) {
            this.stopListeners();
            this.started = false;
        }
    }

    private void stopListeners() {
        log.info("Stopping MultiTableChangeListener");
        try {
            if (this.scheduledFuture != null) {
                log.debug("Cancelling scheduledFuture");
                this.scheduledFuture.cancel(true);
                log.debug("Cancelled scheduledFuture");
            }
        }
        catch (Exception e) {
            log.error("Failed to cancel scheduledFuture", (Throwable)e);
        }
        finally {
            this.scheduledFuture = null;
        }
        for (String tableName : this.listenForNotificationsRelatedToTables.keySet()) {
            try {
                this.unlisten(tableName);
            }
            catch (Exception e) {
                log.error("Failed to unlisten table '{}'", (Object)tableName, (Object)e);
            }
        }
        if (this.executorService != null) {
            log.debug("Shutting down scheduled executor service");
            this.executorService.shutdownNow();
            this.executorService = null;
            log.debug("Shutdown Scheduled Executor service");
        }
        log.info("Stopped MultiTableChangeListener");
    }

    public boolean isStarted() {
        return this.started;
    }

    @Override
    public void close() {
        if (this.started) {
            log.info("Ignoring call to close as MultiTableChangeListener was started using Lifecycle methods");
        } else {
            this.stopListeners();
        }
    }
}

