/*
 * Decompiled with CFR 0.152.
 */
package org.reveno.atp.clustering.core;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import org.reveno.atp.clustering.api.ClusterBuffer;
import org.reveno.atp.core.api.FailoverManager;
import org.reveno.atp.core.api.channel.Buffer;
import org.reveno.atp.core.api.serialization.TransactionInfoSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterFailoverManager
implements FailoverManager {
    protected ClusterBuffer buffer;
    protected List<Runnable> onBlockedListeners = new CopyOnWriteArrayList<Runnable>();
    protected List<Runnable> onUnblockedListeners = new CopyOnWriteArrayList<Runnable>();
    protected volatile Consumer<List<Object>> failoverHandler;
    protected volatile boolean isMaster;
    protected volatile boolean isBlocked;
    protected List<List<Object>> notProcessedTransactions = new CopyOnWriteArrayList<List<Object>>();
    protected static final Logger LOG = LoggerFactory.getLogger(ClusterFailoverManager.class);

    public ClusterBuffer buffer() {
        return this.buffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean newMessage(List<Object> cmds) {
        if (!this.isBlocked && this.failoverHandler != null) {
            if (this.unprocessedCount() != 0L) {
                this.processPendingMessages();
            }
            this.failoverHandler.accept(cmds);
            return true;
        }
        ClusterFailoverManager clusterFailoverManager = this;
        synchronized (clusterFailoverManager) {
            this.notProcessedTransactions.add(cmds);
        }
        return false;
    }

    public synchronized void block() {
        if (this.isBlocked) {
            throw new IllegalArgumentException("Failover manager is already blocked.");
        }
        this.isBlocked = true;
        this.onBlockedListeners.forEach(Runnable::run);
    }

    public synchronized void unblock() {
        if (!this.isBlocked) {
            throw new IllegalArgumentException("Failover manager is not blocked.");
        }
        this.onUnblockedListeners.forEach(Runnable::run);
        this.isBlocked = false;
        this.processPendingMessages();
    }

    public boolean isMaster() {
        return this.isMaster;
    }

    public boolean isBlocked() {
        return this.isBlocked;
    }

    public void onReplicationMessage(Consumer<List<Object>> failoverHandler) {
        this.failoverHandler = failoverHandler;
    }

    public void addOnBlocked(Runnable handler) {
        this.onBlockedListeners.add(handler);
    }

    public void addOnUnblocked(Runnable handler) {
        this.onUnblockedListeners.add(handler);
    }

    public boolean replicate(Consumer<Buffer> bufferWriter) {
        try {
            this.buffer.prepare();
            bufferWriter.accept(this.buffer);
            return this.buffer.replicate();
        }
        catch (Throwable t) {
            return false;
        }
    }

    public long unprocessedCount() {
        return this.notProcessedTransactions.size();
    }

    public synchronized void processPendingMessages() {
        if (this.notProcessedTransactions.size() > 0) {
            this.notProcessedTransactions.forEach(this.failoverHandler::accept);
            this.notProcessedTransactions.clear();
        }
    }

    public void setMaster(boolean isMaster) {
        this.isMaster = isMaster;
    }

    public ClusterFailoverManager(TransactionInfoSerializer serializer, ClusterBuffer buffer) {
        this.buffer = buffer;
        buffer.messageNotifier(serializer, this::newMessage);
    }
}

