/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.ha.transaction;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.InstanceId;
import org.neo4j.com.ComException;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.Predicate;
import org.neo4j.helpers.collection.FilteringIterator;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.HaSettings;
import org.neo4j.kernel.ha.com.master.Slave;
import org.neo4j.kernel.ha.com.master.SlavePriorities;
import org.neo4j.kernel.ha.com.master.SlavePriority;
import org.neo4j.kernel.ha.com.master.Slaves;
import org.neo4j.kernel.ha.transaction.CommitPusher;
import org.neo4j.kernel.impl.util.CappedOperation;
import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.lifecycle.Lifecycle;

public class TransactionPropagator
implements Lifecycle {
    private int desiredReplicationFactor;
    private SlavePriority replicationStrategy;
    private ExecutorService slaveCommitters;
    private final StringLogger log;
    private final Configuration config;
    private final Slaves slaves;
    private final CommitPusher pusher;
    private final CappedOperation<ReplicationContext> slaveCommitFailureLogger = new CappedOperation<ReplicationContext>(new CappedOperation.Switch[]{CappedOperation.time((long)5L, (TimeUnit)TimeUnit.SECONDS), CappedOperation.differentItemClasses()}){

        protected void triggered(ReplicationContext context) {
            TransactionPropagator.this.log.error("Slave " + context.slave.getServerId() + ": Replication commit threw" + (context.throwable instanceof ComException ? " communication" : "") + " exception:", context.throwable);
        }
    };

    public static Configuration from(final Config config) {
        return new Configuration(){

            @Override
            public int getTxPushFactor() {
                return (Integer)config.get(HaSettings.tx_push_factor);
            }

            @Override
            public InstanceId getServerId() {
                return (InstanceId)config.get(ClusterSettings.server_id);
            }

            @Override
            public SlavePriority getReplicationStrategy() {
                switch ((HaSettings.TxPushStrategy)((Object)config.get(HaSettings.tx_push_strategy))) {
                    case fixed: {
                        return SlavePriorities.fixed();
                    }
                    case round_robin: {
                        return SlavePriorities.roundRobin();
                    }
                }
                throw new RuntimeException("Unknown replication strategy ");
            }
        };
    }

    public static Configuration from(final Config config, final SlavePriority slavePriority) {
        return new Configuration(){

            @Override
            public int getTxPushFactor() {
                return (Integer)config.get(HaSettings.tx_push_factor);
            }

            @Override
            public InstanceId getServerId() {
                return (InstanceId)config.get(ClusterSettings.server_id);
            }

            @Override
            public SlavePriority getReplicationStrategy() {
                return slavePriority;
            }
        };
    }

    public TransactionPropagator(Configuration config, StringLogger log, Slaves slaves, CommitPusher pusher) {
        this.config = config;
        this.log = log;
        this.slaves = slaves;
        this.pusher = pusher;
    }

    public void init() throws Throwable {
    }

    public void start() throws Throwable {
        this.slaveCommitters = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory("slave-committer"));
        this.desiredReplicationFactor = this.config.getTxPushFactor();
        this.replicationStrategy = this.config.getReplicationStrategy();
    }

    public void stop() throws Throwable {
        this.slaveCommitters.shutdown();
    }

    public void shutdown() throws Throwable {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void committed(long txId, int authorId) {
        boolean isAuthoredBySlave;
        int replicationFactor = this.desiredReplicationFactor;
        boolean bl = isAuthoredBySlave = this.config.getServerId().toIntegerIndex() != authorId;
        if (isAuthoredBySlave) {
            --replicationFactor;
        }
        if (replicationFactor == 0) {
            return;
        }
        HashSet<ReplicationContext> committers = new HashSet<ReplicationContext>();
        try {
            int successfulReplications = 0;
            Iterator<Slave> slaveList = this.filter(this.replicationStrategy.prioritize(this.slaves.getSlaves()).iterator(), authorId);
            CompletionNotifier notifier = new CompletionNotifier();
            for (int i = 0; i < replicationFactor && slaveList.hasNext(); ++i) {
                Slave slave = slaveList.next();
                Callable<Void> slaveCommitter = this.slaveCommitter(slave, txId, notifier);
                committers.add(new ReplicationContext(this.slaveCommitters.submit(slaveCommitter), slave));
            }
            ArrayList<ReplicationContext> toAdd = new ArrayList<ReplicationContext>();
            ArrayList<ReplicationContext> toRemove = new ArrayList<ReplicationContext>();
            while (!committers.isEmpty() && successfulReplications < replicationFactor) {
                toAdd.clear();
                toRemove.clear();
                for (ReplicationContext context : committers) {
                    if (!context.future.isDone()) continue;
                    if (this.isSuccessful(context)) {
                        ++successfulReplications;
                    } else if (slaveList.hasNext()) {
                        Slave newSlave = slaveList.next();
                        Callable<Void> slaveCommitter = this.slaveCommitter(newSlave, txId, notifier);
                        toAdd.add(new ReplicationContext(this.slaveCommitters.submit(slaveCommitter), newSlave));
                    }
                    toRemove.add(context);
                }
                if (!toAdd.isEmpty()) {
                    committers.addAll(toAdd);
                }
                if (!toRemove.isEmpty()) {
                    committers.removeAll(toRemove);
                }
                if (committers.isEmpty()) continue;
                notifier.waitForAnyCompletion();
            }
            if (successfulReplications < replicationFactor) {
                this.log.info("Transaction " + txId + " couldn't commit on enough slaves, desired " + replicationFactor + ", but could only commit at " + successfulReplications);
            }
        }
        catch (Throwable t) {
            try {
                this.log.error("Unknown error commit master transaction at slave", t);
            }
            catch (Throwable throwable) {
                for (ReplicationContext context : committers) {
                    context.future.cancel(false);
                }
                throw throwable;
            }
            for (ReplicationContext context : committers) {
                context.future.cancel(false);
            }
        }
        for (ReplicationContext context : committers) {
            context.future.cancel(false);
        }
    }

    private Iterator<Slave> filter(Iterator<Slave> slaves, final Integer externalAuthorServerId) {
        return externalAuthorServerId == null ? slaves : new FilteringIterator((Iterator)slaves, (Predicate)new Predicate<Slave>(){

            public boolean accept(Slave item) {
                return item.getServerId() != externalAuthorServerId.intValue();
            }
        });
    }

    private boolean isSuccessful(ReplicationContext context) {
        try {
            context.future.get();
            return true;
        }
        catch (InterruptedException e) {
            return false;
        }
        catch (ExecutionException e) {
            context.throwable = e.getCause();
            this.slaveCommitFailureLogger.event((Object)context);
            return false;
        }
        catch (CancellationException e) {
            return false;
        }
    }

    private Callable<Void> slaveCommitter(final Slave slave, final long txId, final CompletionNotifier notifier) {
        return new Callable<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Void call() {
                try {
                    TransactionPropagator.this.pusher.queuePush(slave, txId);
                    Void void_ = null;
                    return void_;
                }
                finally {
                    notifier.completed();
                }
            }
        };
    }

    private static class CompletionNotifier {
        private boolean notified;

        private CompletionNotifier() {
        }

        synchronized void completed() {
            this.notified = true;
            this.notifyAll();
        }

        synchronized void waitForAnyCompletion() {
            if (!this.notified) {
                this.notified = false;
                try {
                    this.wait(2000L);
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                }
            } else {
                this.notified = false;
            }
        }

        public String toString() {
            return "CompletionNotifier{id=" + System.identityHashCode(this) + ",notified=" + this.notified + "}";
        }
    }

    private static class ReplicationContext {
        final Future<Void> future;
        final Slave slave;
        Throwable throwable;

        ReplicationContext(Future<Void> future, Slave slave) {
            this.future = future;
            this.slave = slave;
        }
    }

    public static interface Configuration {
        public int getTxPushFactor();

        public InstanceId getServerId();

        public SlavePriority getReplicationStrategy();
    }
}

