/*
 * Decompiled with CFR 0.152.
 */
package org.revenj;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Type;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.sql.DataSource;
import org.postgresql.Driver;
import org.postgresql.PGNotification;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.Notification;
import org.postgresql.core.PGStream;
import org.postgresql.util.HostSpec;
import org.revenj.Utils;
import org.revenj.database.postgres.ConnectionFactory;
import org.revenj.database.postgres.PostgresReader;
import org.revenj.database.postgres.converters.StringConverter;
import org.revenj.extensibility.SystemState;
import org.revenj.patterns.DataChangeNotification;
import org.revenj.patterns.DomainModel;
import org.revenj.patterns.EagerNotification;
import org.revenj.patterns.Repository;
import org.revenj.patterns.ServiceLocator;
import rx.Observable;
import rx.subjects.PublishSubject;

final class PostgresDatabaseNotification
implements EagerNotification,
Closeable {
    private final DataSource dataSource;
    private final Optional<DomainModel> domainModel;
    private final SystemState systemState;
    private final ServiceLocator locator;
    private final Properties properties;
    private final PublishSubject<DataChangeNotification.NotifyInfo> subject = PublishSubject.create();
    private final Observable<DataChangeNotification.NotifyInfo> notifications;
    private final ConcurrentMap<Class<?>, Repository> repositories = new ConcurrentHashMap();
    private final ConcurrentMap<String, HashSet<Class<?>>> targets = new ConcurrentHashMap();
    private int retryCount;
    private final int maxTimeout;
    private boolean isClosed;
    private PGStream currentStream;

    public PostgresDatabaseNotification(DataSource dataSource, Optional<DomainModel> domainModel, Properties properties, SystemState systemState, ServiceLocator locator) {
        this.dataSource = dataSource;
        this.domainModel = domainModel;
        this.properties = properties;
        this.systemState = systemState;
        this.locator = locator;
        this.notifications = this.subject.asObservable();
        String timeoutValue = properties.getProperty("revenj.notifications.timeout");
        if (timeoutValue != null) {
            try {
                this.maxTimeout = Integer.parseInt(timeoutValue);
            }
            catch (NumberFormatException e) {
                throw new RuntimeException("Error parsing notificationTimeout setting");
            }
        } else {
            this.maxTimeout = 1000;
        }
        if ("disabled".equals(properties.getProperty("revenj.notifications.status"))) {
            this.isClosed = true;
        } else if ("pooling".equals(properties.getProperty("revenj.notifications.type"))) {
            this.setupPooling();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                this.isClosed = true;
            }));
        } else {
            this.setupListening();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                this.isClosed = true;
            }));
        }
    }

    private void setupPooling() {
        if (this.dataSource == null) {
            return;
        }
        ++this.retryCount;
        if (this.retryCount > 60) {
            this.retryCount = 30;
        }
        try {
            Connection connection = this.dataSource.getConnection();
            BaseConnection bc = null;
            if (connection instanceof BaseConnection) {
                bc = (BaseConnection)connection;
            } else {
                try {
                    if (connection != null && connection.isWrapperFor(BaseConnection.class)) {
                        bc = connection.unwrap(BaseConnection.class);
                    }
                }
                catch (AbstractMethodError abstractMethodError) {
                    // empty catch block
                }
                if (bc == null && this.properties.containsKey("revenj.jdbcUrl")) {
                    String user = this.properties.getProperty("revenj.user");
                    String pass = this.properties.getProperty("revenj.password");
                    Driver driver = new Driver();
                    Properties connProps = new Properties(this.properties);
                    if (user != null && pass != null) {
                        connProps.setProperty("user", user);
                        connProps.setProperty("password", pass);
                    }
                    this.cleanupConnection(connection);
                    connection = driver.connect(this.properties.getProperty("revenj.jdbcUrl"), connProps);
                    if (connection instanceof BaseConnection) {
                        bc = (BaseConnection)connection;
                    }
                }
            }
            if (bc != null) {
                Statement stmt = bc.createStatement();
                stmt.execute("LISTEN events; LISTEN aggregate_roots; LISTEN migration; LISTEN revenj");
                this.retryCount = 0;
                Pooling pooling = new Pooling(bc, stmt);
                Thread thread = new Thread(pooling);
                thread.setDaemon(true);
                thread.start();
            } else {
                this.cleanupConnection(connection);
            }
        }
        catch (Exception ex) {
            try {
                this.systemState.notify(new SystemState.SystemEvent("notification", "issue: " + ex.getMessage()));
                Thread.sleep(1000 * this.retryCount);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void setupListening() {
        Properties parsed;
        String jdbcUrl;
        ++this.retryCount;
        if (this.retryCount > 60) {
            this.retryCount = 30;
        }
        if ((jdbcUrl = this.properties.getProperty("revenj.jdbcUrl")) == null || jdbcUrl.isEmpty()) {
            throw new RuntimeException("Unable to read revenj.jdbcUrl from properties. Listening notification is not supported without it.\nEither disable notifications (revenj.notifications.status=disabled), change it to pooling (revenj.notifications.type=pooling) or provide revenj.jdbcUrl to properties.");
        }
        if (!jdbcUrl.startsWith("jdbc:postgresql:") && jdbcUrl.contains("://")) {
            jdbcUrl = "jdbc:postgresql" + jdbcUrl.substring(jdbcUrl.indexOf("://"));
        }
        if ((parsed = Driver.parseURL((String)jdbcUrl, (Properties)this.properties)) == null) {
            throw new RuntimeException("Unable to parse revenj.jdbcUrl");
        }
        try {
            PGStream pgStream;
            String user = this.properties.containsKey("revenj.user") ? this.properties.getProperty("revenj.user") : parsed.getProperty("user", "");
            String password = this.properties.containsKey("revenj.password") ? this.properties.getProperty("revenj.password") : parsed.getProperty("password", "");
            String db = parsed.getProperty("PGDBNAME");
            HostSpec host = new HostSpec(parsed.getProperty("PGHOST").split(",")[0], Integer.parseInt(parsed.getProperty("PGPORT").split(",")[0]));
            this.currentStream = pgStream = ConnectionFactory.openConnection(host, user, password, db, this.properties);
            this.retryCount = 0;
            Listening listening = new Listening(pgStream);
            Thread thread = new Thread(listening);
            thread.setDaemon(true);
            thread.start();
        }
        catch (Exception ex) {
            try {
                this.systemState.notify(new SystemState.SystemEvent("notification", "issue: " + ex.getMessage()));
                Thread.sleep(1000 * this.retryCount);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void processNotification(PostgresReader reader, PGNotification n) throws IOException {
        if ("events".equals(n.getName()) || "aggregate_roots".equals(n.getName())) {
            String param = n.getParameter();
            String ident = param.substring(0, param.indexOf(58));
            String op = param.substring(ident.length() + 1, param.indexOf(58, ident.length() + 1));
            String values = param.substring(ident.length() + op.length() + 2);
            reader.process(values);
            List<String> ids = StringConverter.parseCollection(reader, 0, false);
            if (ids != null && ids.size() > 0) {
                String[] uris = ids.toArray(new String[ids.size()]);
                switch (op) {
                    case "Update": {
                        this.subject.onNext((Object)new DataChangeNotification.NotifyInfo(ident, DataChangeNotification.Operation.Update, DataChangeNotification.Source.Database, uris));
                        break;
                    }
                    case "Change": {
                        this.subject.onNext((Object)new DataChangeNotification.NotifyInfo(ident, DataChangeNotification.Operation.Change, DataChangeNotification.Source.Database, uris));
                        break;
                    }
                    case "Delete": {
                        this.subject.onNext((Object)new DataChangeNotification.NotifyInfo(ident, DataChangeNotification.Operation.Delete, DataChangeNotification.Source.Database, uris));
                        break;
                    }
                    default: {
                        this.subject.onNext((Object)new DataChangeNotification.NotifyInfo(ident, DataChangeNotification.Operation.Insert, DataChangeNotification.Source.Database, uris));
                    }
                }
            }
        } else {
            this.systemState.notify(new SystemState.SystemEvent(n.getName(), n.getParameter()));
        }
    }

    private Repository getRepository(Class<?> manifest) {
        return this.repositories.computeIfAbsent(manifest, clazz -> {
            try {
                return (Repository)this.locator.resolve(Utils.makeGenericType(Repository.class, manifest, new Type[0]));
            }
            catch (ReflectiveOperationException ex) {
                throw new RuntimeException("Repository is not registered for: " + manifest, ex);
            }
        });
    }

    @Override
    public void notify(DataChangeNotification.NotifyInfo info) {
        this.subject.onNext((Object)info);
    }

    @Override
    public Observable<DataChangeNotification.NotifyInfo> getNotifications() {
        return this.notifications;
    }

    @Override
    public <T> Observable<DataChangeNotification.TrackInfo<T>> track(Class<T> manifest) {
        return this.notifications.filter(it -> {
            HashSet set = (HashSet)this.targets.get(it.name);
            if (set == null) {
                set = new HashSet();
                Optional<Class<?>> domainType = this.domainModel.get().find(it.name);
                if (domainType.isPresent()) {
                    set.add(domainType.get());
                    Collections.addAll(set, domainType.get().getInterfaces());
                }
                this.targets.put(it.name, set);
            }
            return set.contains(manifest);
        }).map(it -> new DataChangeNotification.TrackInfo(it.uris, new LazyResult(manifest, it.uris)));
    }

    private synchronized void cleanupConnection(Connection connection) {
        try {
            if (connection != null && !connection.isClosed()) {
                connection.close();
            }
        }
        catch (SQLException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() {
        this.isClosed = true;
        try {
            if (this.currentStream != null) {
                this.currentStream.close();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private class LazyResult<T>
    implements Callable<List<T>> {
        private final Class<T> manifest;
        private final String[] uris;
        private List<T> result;

        LazyResult(Class<T> manifest, String[] uris) {
            this.manifest = manifest;
            this.uris = uris;
        }

        @Override
        public List<T> call() throws Exception {
            if (this.result == null) {
                Repository repository = PostgresDatabaseNotification.this.getRepository(this.manifest);
                this.result = repository.find(this.uris);
            }
            return this.result;
        }
    }

    private class Listening
    implements Runnable {
        private final PGStream stream;

        Listening(PGStream stream) throws IOException {
            this.stream = stream;
            byte[] command = "LISTEN events; LISTEN aggregate_roots; LISTEN migration; LISTEN revenj".getBytes("UTF-8");
            stream.SendChar(81);
            stream.SendInteger4(command.length + 5);
            stream.Send(command);
            stream.SendChar(0);
            stream.flush();
            this.receiveCommand(stream);
            this.receiveCommand(stream);
            this.receiveCommand(stream);
            this.receiveCommand(stream);
            if (stream.ReceiveChar() != 90) {
                throw new IOException("Unable to setup Postgres listener");
            }
            int num = stream.ReceiveInteger4();
            if (num != 5) {
                throw new IOException("unexpected length of ReadyForQuery packet");
            }
            stream.ReceiveChar();
        }

        private void receiveCommand(PGStream pgStream) throws IOException {
            pgStream.ReceiveChar();
            int len = pgStream.ReceiveInteger4();
            pgStream.Skip(len - 4);
        }

        @Override
        public void run() {
            PostgresReader reader = new PostgresReader();
            PGStream pgStream = this.stream;
            PostgresDatabaseNotification.this.systemState.notify(new SystemState.SystemEvent("notification", "started"));
            block10: while (!PostgresDatabaseNotification.this.isClosed) {
                try {
                    switch (pgStream.ReceiveChar()) {
                        case 65: {
                            pgStream.ReceiveInteger4();
                            int pidA = pgStream.ReceiveInteger4();
                            String msgA = pgStream.ReceiveString();
                            String paramA = pgStream.ReceiveString();
                            PostgresDatabaseNotification.this.processNotification(reader, (PGNotification)new Notification(msgA, pidA, paramA));
                            continue block10;
                        }
                        case 69: {
                            if (PostgresDatabaseNotification.this.isClosed) continue block10;
                            int e_len = pgStream.ReceiveInteger4();
                            String err = pgStream.ReceiveString(e_len - 4);
                            throw new IOException(err);
                        }
                    }
                    if (PostgresDatabaseNotification.this.isClosed) continue;
                    throw new IOException("Unexpected packet type");
                }
                catch (Exception ex) {
                    try {
                        PostgresDatabaseNotification.this.currentStream = null;
                        PostgresDatabaseNotification.this.systemState.notify(new SystemState.SystemEvent("notification", "error: " + ex.getMessage()));
                        pgStream.close();
                        Thread.sleep(1000L);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (!PostgresDatabaseNotification.this.isClosed) {
                        PostgresDatabaseNotification.this.setupListening();
                    }
                    return;
                }
            }
            try {
                PostgresDatabaseNotification.this.currentStream = null;
                pgStream.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    private class Pooling
    implements Runnable {
        private final BaseConnection connection;
        private final Statement ping;

        Pooling(BaseConnection connection, Statement ping) {
            this.connection = connection;
            this.ping = ping;
        }

        @Override
        public void run() {
            PostgresReader reader = new PostgresReader();
            int timeout = PostgresDatabaseNotification.this.maxTimeout;
            PostgresDatabaseNotification.this.systemState.notify(new SystemState.SystemEvent("notification", "started"));
            while (!PostgresDatabaseNotification.this.isClosed) {
                try {
                    this.ping.execute("");
                    PGNotification[] notifications = this.connection.getNotifications();
                    if (notifications == null || notifications.length == 0) {
                        try {
                            Thread.sleep(timeout);
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        if (timeout >= PostgresDatabaseNotification.this.maxTimeout) continue;
                        ++timeout;
                        continue;
                    }
                    timeout = 0;
                    for (PGNotification n : notifications) {
                        PostgresDatabaseNotification.this.processNotification(reader, n);
                    }
                }
                catch (IOException | SQLException ex) {
                    try {
                        PostgresDatabaseNotification.this.systemState.notify(new SystemState.SystemEvent("notification", "error: " + ex.getMessage()));
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    PostgresDatabaseNotification.this.cleanupConnection((Connection)this.connection);
                    if (!PostgresDatabaseNotification.this.isClosed) {
                        PostgresDatabaseNotification.this.setupPooling();
                    }
                    return;
                }
            }
            PostgresDatabaseNotification.this.cleanupConnection((Connection)this.connection);
        }
    }
}

