/*
 * Decompiled with CFR 0.152.
 */
package org.threadly.db.aurora;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threadly.concurrent.ConfigurableThreadFactory;
import org.threadly.concurrent.PriorityScheduler;
import org.threadly.concurrent.PrioritySchedulerService;
import org.threadly.concurrent.ReschedulingOperation;
import org.threadly.concurrent.SubmitterScheduler;
import org.threadly.concurrent.TaskPriority;
import org.threadly.db.aurora.AuroraServer;
import org.threadly.db.aurora.DelegateDriver;

public class AuroraClusterMonitor {
    private static final Logger LOG = Logger.getLogger(AuroraClusterMonitor.class.getSimpleName());
    protected static final int CHECK_FREQUENCY_MILLIS = 500;
    protected static final int MINIMUM_THREAD_POOL_SIZE = 4;
    protected static final int MAXIMUM_THREAD_POOL_SIZE = 32;
    protected static final PrioritySchedulerService MONITOR_SCHEDULER;
    protected static final ConcurrentMap<Set<AuroraServer>, AuroraClusterMonitor> MONITORS;
    protected final ClusterChecker clusterStateChecker;
    private final AtomicLong replicaIndex;

    public static AuroraClusterMonitor getMonitor(Set<AuroraServer> servers) {
        AuroraClusterMonitor result = (AuroraClusterMonitor)MONITORS.get(servers);
        if (result != null) {
            return result;
        }
        return MONITORS.computeIfAbsent(servers, s -> new AuroraClusterMonitor(MONITOR_SCHEDULER, 500L, (Set<AuroraServer>)s));
    }

    protected AuroraClusterMonitor(PrioritySchedulerService scheduler, long checkIntervalMillis, Set<AuroraServer> clusterServers) {
        this.clusterStateChecker = new ClusterChecker(scheduler, checkIntervalMillis, clusterServers);
        this.replicaIndex = new AtomicLong();
    }

    public AuroraServer getRandomReadReplica() {
        int secondaryCount;
        long replicaIndex = this.replicaIndex.getAndIncrement();
        while ((secondaryCount = this.clusterStateChecker.secondaryServers.size()) > 0) {
            try {
                return this.clusterStateChecker.secondaryServers.get((int)(replicaIndex % (long)secondaryCount));
            }
            catch (IndexOutOfBoundsException indexOutOfBoundsException) {
            }
        }
        return null;
    }

    public AuroraServer getCurrentMaster() {
        return this.clusterStateChecker.masterServer.get();
    }

    public void expediteServerCheck(AuroraServer auroraServer) {
        this.clusterStateChecker.expediteServerCheck(auroraServer);
    }

    static {
        PriorityScheduler ps = new PriorityScheduler(4, TaskPriority.High, 1000L, (ThreadFactory)new ConfigurableThreadFactory("auroraMonitor-", false, true, 5, null, null));
        ps.prestartAllThreads();
        ps.setPoolSize(32);
        MONITOR_SCHEDULER = ps;
        MONITORS = new ConcurrentHashMap<Set<AuroraServer>, AuroraClusterMonitor>();
    }

    protected static class ServerMonitor
    implements Runnable {
        protected final AuroraServer server;
        protected final ReschedulingOperation clusterStateChecker;
        protected final AtomicBoolean running;
        protected Connection serverConnection;
        protected volatile Throwable lastError;
        protected volatile boolean readOnly;

        protected ServerMonitor(AuroraServer server, ReschedulingOperation clusterStateChecker) {
            this.server = server;
            this.clusterStateChecker = clusterStateChecker;
            this.running = new AtomicBoolean(false);
            try {
                this.reconnect();
            }
            catch (SQLException e) {
                throw new RuntimeException("Could not connect to monitor cluster member: " + server + ", error is fatal", e);
            }
            this.lastError = null;
            this.readOnly = false;
        }

        protected void reconnect() throws SQLException {
            Connection newConnection = DelegateDriver.connect(this.server.hostAndPortString() + "/?connectTimeout=10000&socketTimeout=10000&serverTimezone=UTC", this.server.getProperties());
            if (this.serverConnection != null) {
                try {
                    this.serverConnection.close();
                }
                catch (SQLException sQLException) {
                    // empty catch block
                }
            }
            this.serverConnection = newConnection;
        }

        public boolean isHealthy() {
            return this.lastError == null;
        }

        public boolean isReadOnly() {
            return this.readOnly;
        }

        @Override
        public void run() {
            if (this.running.compareAndSet(false, true)) {
                try {
                    this.updateState();
                }
                finally {
                    this.running.set(false);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void updateState() {
            Throwable currentError;
            block46: {
                boolean currentlyReadOnly = true;
                currentError = null;
                try {
                    try (PreparedStatement ps = this.serverConnection.prepareStatement("SHOW GLOBAL VARIABLES LIKE 'innodb_read_only';");
                         ResultSet results = ps.executeQuery();){
                        if (results.next()) {
                            String readOnlyStr = results.getString("Value");
                            if (readOnlyStr.equals("OFF")) {
                                currentlyReadOnly = false;
                            } else if (readOnlyStr.equals("ON")) {
                                currentlyReadOnly = true;
                            } else {
                                LOG.severe("Unknown db state, may require library upgrade: " + readOnlyStr);
                            }
                        } else {
                            LOG.severe("No result looking up db state, likely not connected to Aurora database");
                        }
                    }
                    if (currentlyReadOnly == this.readOnly && this.lastError == null == (currentError == null)) break block46;
                }
                catch (Throwable t) {
                    block47: {
                        try {
                            currentError = t;
                            if (!t.equals(this.lastError)) {
                                LOG.log(Level.WARNING, "Setting aurora server " + this.server + " as unhealthy due to error checking state", t);
                            }
                            if (currentlyReadOnly == this.readOnly && this.lastError == null == (currentError == null)) break block47;
                        }
                        catch (Throwable throwable) {
                            if (currentlyReadOnly != this.readOnly || this.lastError == null != (currentError == null)) {
                                this.lastError = currentError;
                                this.readOnly = currentlyReadOnly;
                                this.clusterStateChecker.signalToRun();
                            }
                            if (currentError != null) {
                                try {
                                    this.reconnect();
                                }
                                catch (SQLException sQLException) {
                                    // empty catch block
                                }
                            }
                            throw throwable;
                        }
                        this.lastError = currentError;
                        this.readOnly = currentlyReadOnly;
                        this.clusterStateChecker.signalToRun();
                    }
                    if (currentError != null) {
                        try {
                            this.reconnect();
                        }
                        catch (SQLException sQLException) {}
                    }
                }
                this.lastError = currentError;
                this.readOnly = currentlyReadOnly;
                this.clusterStateChecker.signalToRun();
            }
            if (currentError != null) {
                try {
                    this.reconnect();
                }
                catch (SQLException ps) {}
            }
        }
    }

    protected static class ClusterChecker
    extends ReschedulingOperation {
        protected final PrioritySchedulerService scheduler;
        protected final Map<AuroraServer, ServerMonitor> allServers;
        protected final List<AuroraServer> secondaryServers;
        protected final AtomicReference<AuroraServer> masterServer;
        protected final List<AuroraServer> serversWaitingExpeditiedCheck;
        private volatile boolean initialized = false;

        protected ClusterChecker(PrioritySchedulerService scheduler, long checkIntervalMillis, Set<AuroraServer> clusterServers) {
            super((SubmitterScheduler)scheduler, 0L);
            this.scheduler = scheduler;
            this.allServers = new HashMap<AuroraServer, ServerMonitor>();
            this.secondaryServers = new CopyOnWriteArrayList<AuroraServer>();
            this.serversWaitingExpeditiedCheck = new CopyOnWriteArrayList<AuroraServer>();
            this.masterServer = new AtomicReference();
            for (AuroraServer server : clusterServers) {
                ServerMonitor monitor = new ServerMonitor(server, this);
                this.allServers.put(server, monitor);
                if (this.masterServer.get() == null) {
                    monitor.run();
                    if (monitor.isHealthy()) {
                        if (!monitor.isReadOnly()) {
                            this.masterServer.set(server);
                        } else {
                            this.secondaryServers.add(server);
                        }
                    }
                } else {
                    scheduler.execute((Runnable)monitor, TaskPriority.Low);
                }
                scheduler.scheduleAtFixedRate((Runnable)monitor, (long)Math.abs(System.identityHashCode(monitor)) % checkIntervalMillis, checkIntervalMillis);
            }
            if (this.masterServer.get() == null) {
                LOG.warning("No master server found!  Will use read only servers till one becomes master");
            }
            this.initialized = true;
            this.signalToRun();
        }

        protected ClusterChecker(PrioritySchedulerService scheduler, long checkIntervalMillis, Map<AuroraServer, ServerMonitor> clusterServers) {
            super((SubmitterScheduler)scheduler, 0L);
            this.scheduler = scheduler;
            this.allServers = clusterServers;
            this.secondaryServers = new CopyOnWriteArrayList<AuroraServer>();
            this.serversWaitingExpeditiedCheck = new CopyOnWriteArrayList<AuroraServer>();
            this.masterServer = new AtomicReference();
            this.initialized = true;
        }

        protected void expediteServerCheck(ServerMonitor serverMonitor) {
            if (!this.serversWaitingExpeditiedCheck.contains(serverMonitor.server)) {
                this.serversWaitingExpeditiedCheck.add(serverMonitor.server);
                this.scheduler.execute(() -> {
                    this.serversWaitingExpeditiedCheck.remove(serverMonitor.server);
                    serverMonitor.run();
                }, TaskPriority.Low);
            }
        }

        public void expediteServerCheck(AuroraServer auroraServer) {
            ServerMonitor monitor = this.allServers.get(auroraServer);
            if (monitor != null) {
                this.expediteServerCheck(monitor);
            }
        }

        protected void checkAllServers() {
            for (ServerMonitor sm : this.allServers.values()) {
                this.expediteServerCheck(sm);
            }
        }

        protected void run() {
            if (!this.initialized) {
                return;
            }
            for (Map.Entry<AuroraServer, ServerMonitor> p : this.allServers.entrySet()) {
                if (p.getValue().isHealthy()) {
                    if (p.getValue().isReadOnly()) {
                        if (p.getKey().equals(this.masterServer.get())) {
                            this.masterServer.compareAndSet(p.getKey(), null);
                            this.checkAllServers();
                        }
                        if (this.secondaryServers.contains(p.getKey())) continue;
                        this.secondaryServers.add(p.getKey());
                        continue;
                    }
                    if (p.getKey().equals(this.masterServer.get())) continue;
                    this.masterServer.set(p.getKey());
                    LOG.info("New master server: " + p.getKey());
                    continue;
                }
                if (this.secondaryServers.remove(p.getKey()) || !p.getKey().equals(this.masterServer.get())) continue;
                this.masterServer.compareAndSet(p.getKey(), null);
                this.checkAllServers();
            }
        }
    }
}

