/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.ha;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import org.neo4j.com.Response;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.Predicate;
import org.neo4j.helpers.collection.FilteringIterator;
import org.neo4j.kernel.ha.Broker;
import org.neo4j.kernel.ha.Slave;
import org.neo4j.kernel.ha.SlavePriority;
import org.neo4j.kernel.impl.transaction.xaframework.TxIdGenerator;
import org.neo4j.kernel.impl.transaction.xaframework.XaDataSource;
import org.neo4j.kernel.impl.util.StringLogger;

public class MasterTxIdGenerator
implements TxIdGenerator {
    private final Broker broker;
    private final int desiredReplicationFactor;
    private final SlavePriority replicationStrategy;
    private ExecutorService slaveCommitters;
    private final StringLogger log;

    public MasterTxIdGenerator(Broker broker, int desiredReplicationFactor, SlavePriority replicationStrategy, StringLogger log) {
        this.broker = broker;
        this.desiredReplicationFactor = desiredReplicationFactor;
        this.replicationStrategy = replicationStrategy;
        this.log = log;
    }

    public long generate(XaDataSource dataSource, int identifier) {
        return TxIdGenerator.DEFAULT.generate(dataSource, identifier);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void committed(XaDataSource dataSource, int identifier, long txId, Integer externalAuthorServerId) {
        int replicationFactor = this.desiredReplicationFactor;
        if (externalAuthorServerId != null) {
            --replicationFactor;
        }
        if (replicationFactor == 0) {
            return;
        }
        HashSet<Future<Void>> committers = new HashSet<Future<Void>>();
        try {
            int successfulReplications = 0;
            Iterator<Slave> iterator = this.filter(this.replicationStrategy.prioritize(this.broker.getSlaves()), externalAuthorServerId);
            CompletionNotifier notifier = new CompletionNotifier();
            for (int i = 0; i < replicationFactor && iterator.hasNext(); ++i) {
                committers.add(this.slaveCommitters.submit(this.slaveCommitter(dataSource, identifier, iterator.next(), txId, notifier)));
            }
            ArrayList<Future<Void>> toAdd = new ArrayList<Future<Void>>();
            ArrayList<Future> toRemove = new ArrayList<Future>();
            while (!committers.isEmpty() && successfulReplications < replicationFactor) {
                toAdd.clear();
                toRemove.clear();
                for (Future future : committers) {
                    if (!future.isDone()) continue;
                    if (this.isSuccessfull(future)) {
                        ++successfulReplications;
                    } else if (iterator.hasNext()) {
                        toAdd.add(this.slaveCommitters.submit(this.slaveCommitter(dataSource, identifier, iterator.next(), txId, notifier)));
                    }
                    toRemove.add(future);
                }
                if (!toAdd.isEmpty()) {
                    committers.addAll(toAdd);
                }
                if (!toRemove.isEmpty()) {
                    committers.removeAll(toRemove);
                }
                if (committers.isEmpty()) continue;
                notifier.waitForAnyCompletion();
            }
            if (successfulReplications < replicationFactor) {
                this.log.logMessage("Transaction " + txId + " for " + dataSource.getName() + " couldn't commit on enough slaves, desired " + replicationFactor + ", but could only commit at " + successfulReplications);
            }
        }
        catch (Throwable t) {
            t.printStackTrace();
            this.log.logMessage("Unknown error commit master transaction at slave", t);
        }
        finally {
            for (Future future : committers) {
                future.cancel(false);
            }
        }
    }

    private Iterator<Slave> filter(Iterator<Slave> slaves, final Integer externalAuthorServerId) {
        return externalAuthorServerId == null ? slaves : new FilteringIterator((Iterator)slaves, (Predicate)new Predicate<Slave>(){

            public boolean accept(Slave item) {
                return item.getServerId() != externalAuthorServerId.intValue();
            }
        });
    }

    private boolean isSuccessfull(Future<Void> committer) {
        try {
            committer.get();
            return true;
        }
        catch (InterruptedException e) {
            return false;
        }
        catch (ExecutionException e) {
            return false;
        }
        catch (CancellationException e) {
            return false;
        }
    }

    private Callable<Void> slaveCommitter(final XaDataSource dataSource, final int identifier, final Slave slave, final long txId, final CompletionNotifier notifier) {
        return new Callable<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Void call() {
                try {
                    MasterTxIdGenerator.this.commitAtSlave(dataSource, identifier, slave, txId);
                    Void void_ = null;
                    return void_;
                }
                finally {
                    notifier.completed();
                }
            }
        };
    }

    private void commitAtSlave(XaDataSource dataSource, int identifier, Slave slave, long txId) {
        Response<Void> response = slave.pullUpdates(dataSource.getName(), txId);
        response.close();
    }

    public int getCurrentMasterId() {
        return this.broker.getMyMachineId();
    }

    public int getMyId() {
        return this.broker.getMyMachineId();
    }

    public void init() throws Throwable {
    }

    public void start() throws Throwable {
        this.slaveCommitters = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory("slave-committer"));
    }

    public void stop() throws Throwable {
        this.slaveCommitters.shutdown();
    }

    public void shutdown() throws Throwable {
    }

    private static class CompletionNotifier {
        private boolean notified;

        private CompletionNotifier() {
        }

        synchronized void completed() {
            this.notified = true;
            this.notifyAll();
        }

        synchronized void waitForAnyCompletion() {
            if (!this.notified) {
                this.notified = false;
                try {
                    this.wait(2000L);
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                }
            } else {
                this.notified = false;
            }
        }
    }
}

