/*
 * Decompiled with CFR 0.152.
 */
package org.revenj;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Type;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.sql.DataSource;
import org.postgresql.PGNotification;
import org.postgresql.core.BaseConnection;
import org.postgresql.ds.PGPooledConnection;
import org.postgresql.ds.PGPoolingDataSource;
import org.revenj.Utils;
import org.revenj.patterns.DataChangeNotification;
import org.revenj.patterns.DomainModel;
import org.revenj.patterns.EagerNotification;
import org.revenj.patterns.Repository;
import org.revenj.patterns.ServiceLocator;
import org.revenj.postgres.PostgresReader;
import org.revenj.postgres.converters.StringConverter;
import rx.Observable;
import rx.subjects.PublishSubject;

final class PostgresDatabaseNotification
implements EagerNotification,
Closeable {
    private final DataSource dataSource;
    private final Optional<DomainModel> domainModel;
    private final ServiceLocator locator;
    private final PublishSubject<DataChangeNotification.NotifyInfo> subject = PublishSubject.create();
    private final Observable<DataChangeNotification.NotifyInfo> notifications;
    private final ConcurrentMap<Class<?>, Repository> repositories = new ConcurrentHashMap();
    private final ConcurrentMap<String, HashSet<Class<?>>> targets = new ConcurrentHashMap();
    private int retryCount;
    private final int timeout;
    private Connection connection;
    private boolean isClosed;

    public PostgresDatabaseNotification(DataSource dataSource, Optional<DomainModel> domainModel, Properties properties, ServiceLocator locator) {
        this.dataSource = dataSource;
        this.domainModel = domainModel;
        this.locator = locator;
        this.notifications = this.subject.asObservable();
        String timeoutValue = properties.getProperty("revenj.notifications.timeout");
        if (timeoutValue != null) {
            try {
                this.timeout = Integer.parseInt(timeoutValue);
            }
            catch (NumberFormatException e) {
                throw new RuntimeException("Error parsing notificationTimeout setting");
            }
        } else {
            this.timeout = 500;
        }
        if ("disabled".equals(properties.getProperty("revenj.notifications.status"))) {
            this.isClosed = true;
        } else {
            this.setupConnection();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                this.isClosed = true;
            }));
        }
    }

    private void setupConnection() {
        ++this.retryCount;
        if (this.retryCount > 60) {
            this.retryCount = 30;
        }
        try {
            if (this.connection != null) {
                try {
                    this.connection.close();
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            this.connection = this.dataSource.getConnection();
            BaseConnection bc = null;
            if (this.connection instanceof BaseConnection) {
                bc = (BaseConnection)this.connection;
            } else if (this.connection instanceof PGPooledConnection) {
                PGPooledConnection pgpc = (PGPooledConnection)this.connection;
                if (pgpc.getConnection() instanceof BaseConnection) {
                    bc = (BaseConnection)pgpc.getConnection();
                }
            } else if (this.dataSource instanceof PGPoolingDataSource) {
                PGPooledConnection pgpc;
                PGPoolingDataSource pgpds = (PGPoolingDataSource)this.dataSource;
                this.connection.close();
                this.connection = DriverManager.getConnection(pgpds.getUrl(), pgpds.getUser(), pgpds.getPassword());
                if (this.connection instanceof BaseConnection) {
                    bc = (BaseConnection)this.connection;
                } else if (this.connection instanceof PGPooledConnection && (pgpc = (PGPooledConnection)this.connection).getConnection() instanceof BaseConnection) {
                    bc = (BaseConnection)pgpc.getConnection();
                }
            }
            if (bc != null) {
                Statement stmt = bc.createStatement();
                stmt.execute("LISTEN events; LISTEN aggregate_roots");
                this.retryCount = 0;
                Pooling pooling = new Pooling(bc, stmt);
                new Thread(pooling).start();
            }
        }
        catch (Exception ex) {
            try {
                Thread.sleep(1000 * this.retryCount);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private Repository getRepository(Class<?> manifest) {
        return this.repositories.computeIfAbsent(manifest, clazz2 -> {
            try {
                return (Repository)this.locator.resolve(Utils.makeGenericType(Repository.class, manifest, new Type[0]));
            }
            catch (ReflectiveOperationException ex) {
                throw new RuntimeException("Repository is not registered for: " + manifest, ex);
            }
        });
    }

    @Override
    public void notify(DataChangeNotification.NotifyInfo info) {
        this.subject.onNext((Object)info);
    }

    @Override
    public Observable<DataChangeNotification.NotifyInfo> getNotifications() {
        return this.notifications;
    }

    @Override
    public <T> Observable<DataChangeNotification.TrackInfo<T>> track(Class<T> manifest) {
        return this.notifications.filter(it -> {
            HashSet set = (HashSet)this.targets.get(it.name);
            if (set == null) {
                set = new HashSet();
                Optional<Class<?>> domainType = this.domainModel.get().find(it.name);
                if (domainType.isPresent()) {
                    set.add(domainType.get());
                    Collections.addAll(set, domainType.get().getInterfaces());
                }
                this.targets.put(it.name, set);
            }
            return set.contains(manifest);
        }).map(it -> new DataChangeNotification.TrackInfo(it.uris, () -> this.getRepository(manifest).find(notifyInfo.uris)));
    }

    @Override
    public void close() {
        if (this.isClosed) {
            return;
        }
        this.isClosed = true;
        try {
            if (this.connection != null && !this.connection.isClosed()) {
                this.connection.close();
            }
        }
        catch (SQLException e) {
            e.printStackTrace();
        }
        this.connection = null;
    }

    class Pooling
    implements Runnable {
        private final BaseConnection connection;
        private final Statement ping;

        public Pooling(BaseConnection connection, Statement ping) {
            this.connection = connection;
            this.ping = ping;
        }

        @Override
        public void run() {
            PostgresReader reader = new PostgresReader();
            while (!PostgresDatabaseNotification.this.isClosed) {
                try {
                    this.ping.execute("");
                    PGNotification[] notifications = this.connection.getNotifications();
                    if (notifications == null || notifications.length == 0) {
                        try {
                            Thread.sleep(PostgresDatabaseNotification.this.timeout);
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        continue;
                    }
                    block17: for (PGNotification n : notifications) {
                        if (!"events".equals(n.getName()) && !"aggregate_roots".equals(n.getName())) continue;
                        String param = n.getParameter();
                        String ident = param.substring(0, param.indexOf(58));
                        String op = param.substring(ident.length() + 1, param.indexOf(58, ident.length() + 1));
                        String values = param.substring(ident.length() + op.length() + 2);
                        reader.process(values);
                        List<String> ids = StringConverter.parseCollection(reader, 0, false);
                        if (ids == null || ids.size() <= 0) continue;
                        String[] uris = ids.toArray(new String[ids.size()]);
                        switch (op) {
                            case "Update": {
                                PostgresDatabaseNotification.this.subject.onNext((Object)new DataChangeNotification.NotifyInfo(ident, DataChangeNotification.Operation.Update, uris));
                                continue block17;
                            }
                            case "Change": {
                                PostgresDatabaseNotification.this.subject.onNext((Object)new DataChangeNotification.NotifyInfo(ident, DataChangeNotification.Operation.Change, uris));
                                continue block17;
                            }
                            case "Delete": {
                                PostgresDatabaseNotification.this.subject.onNext((Object)new DataChangeNotification.NotifyInfo(ident, DataChangeNotification.Operation.Delete, uris));
                                continue block17;
                            }
                            default: {
                                PostgresDatabaseNotification.this.subject.onNext((Object)new DataChangeNotification.NotifyInfo(ident, DataChangeNotification.Operation.Insert, uris));
                            }
                        }
                    }
                }
                catch (IOException | SQLException ex) {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    PostgresDatabaseNotification.this.setupConnection();
                    return;
                }
            }
        }
    }
}

