/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.procedure;

import ch.cern.hbase.thirdparty.com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.errorhandling.TimeoutException;
import org.apache.hadoop.hbase.procedure.Procedure;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.matchers.ArrayEquals;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={MasterTests.class, MediumTests.class})
public class TestZKProcedure {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestZKProcedure.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedure.class);
    private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
    private static final String COORDINATOR_NODE_NAME = "coordinator";
    private static final long KEEP_ALIVE = 100L;
    private static final int POOL_SIZE = 1;
    private static final long TIMEOUT = 10000L;
    private static final long WAKE_FREQUENCY = 500L;
    private static final String opName = "op";
    private static final byte[] data = new byte[]{1, 2};
    private static final VerificationMode once = Mockito.times((int)1);

    @BeforeClass
    public static void setupTest() throws Exception {
        UTIL.startMiniZKCluster();
    }

    @AfterClass
    public static void cleanupTest() throws Exception {
        UTIL.shutdownMiniZKCluster();
    }

    private static ZKWatcher newZooKeeperWatcher() throws IOException {
        return new ZKWatcher(UTIL.getConfiguration(), "testing utility", new Abortable(){

            public void abort(String why, Throwable e) {
                throw new RuntimeException("Unexpected abort in distributed three phase commit test:" + why, e);
            }

            public boolean isAborted() {
                return false;
            }
        });
    }

    @Test
    public void testEmptyMemberSet() throws Exception {
        this.runCommit(new String[0]);
    }

    @Test
    public void testSingleMember() throws Exception {
        this.runCommit("one");
    }

    @Test
    public void testMultipleMembers() throws Exception {
        this.runCommit("one", "two", "three", "four");
    }

    private void runCommit(String ... members) throws Exception {
        if (members == null) {
            members = new String[]{};
        }
        List<String> expected = Arrays.asList(members);
        ZKWatcher coordZkw = TestZKProcedure.newZooKeeperWatcher();
        String opDescription = "coordination test - " + members.length + " cohort members";
        ZKProcedureCoordinator coordinatorComms = new ZKProcedureCoordinator(coordZkw, opDescription, COORDINATOR_NODE_NAME);
        ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool((String)COORDINATOR_NODE_NAME, (int)1, (long)100L);
        ProcedureCoordinator coordinator = new ProcedureCoordinator((ProcedureCoordinatorRpcs)coordinatorComms, pool){

            public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, List<String> expectedMembers) {
                return (Procedure)Mockito.spy((Object)super.createProcedure(fed, procName, procArgs, expectedMembers));
            }
        };
        SubprocedureFactory subprocFactory = (SubprocedureFactory)Mockito.mock(SubprocedureFactory.class);
        ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(members.length);
        for (String member : members) {
            ZKWatcher watcher = TestZKProcedure.newZooKeeperWatcher();
            ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
            ThreadPoolExecutor pool2 = ProcedureMember.defaultPool((String)member, (int)1, (long)100L);
            ProcedureMember procMember = new ProcedureMember((ProcedureMemberRpcs)comms, pool2, subprocFactory);
            procMembers.add(new Pair((Object)procMember, (Object)comms));
            comms.start(member, procMember);
        }
        final ArrayList<Subprocedure> subprocs = new ArrayList<Subprocedure>();
        for (int i = 0; i < procMembers.size(); ++i) {
            ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
            Subprocedure commit = (Subprocedure)Mockito.spy((Object)new Subprocedure.SubprocedureImpl((ProcedureMember)((Pair)procMembers.get(i)).getFirst(), opName, cohortMonitor, 500L, 10000L));
            subprocs.add(commit);
        }
        final AtomicInteger i = new AtomicInteger(0);
        Mockito.when((Object)subprocFactory.buildSubprocedure((String)Mockito.eq((Object)opName), (byte[])Mockito.argThat((ArgumentMatcher)new ArrayEquals((Object)data)))).thenAnswer((Answer)new Answer<Subprocedure>(){

            public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
                int index = i.getAndIncrement();
                LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
                Subprocedure commit = (Subprocedure)subprocs.get(index);
                return commit;
            }
        });
        Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
        this.waitAndVerifyProc(task, once, once, Mockito.never(), once, false);
        this.verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, Mockito.never(), once, false);
        this.closeAll(coordinator, coordinatorComms, procMembers);
    }

    @Test
    public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
        String opDescription = "error injection coordination";
        Object[] cohortMembers = new String[]{"one", "two", "three"};
        ArrayList expected = Lists.newArrayList((Object[])cohortMembers);
        int memberErrorIndex = 2;
        final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
        ZKWatcher coordinatorWatcher = TestZKProcedure.newZooKeeperWatcher();
        ZKProcedureCoordinator coordinatorController = new ZKProcedureCoordinator(coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
        ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool((String)COORDINATOR_NODE_NAME, (int)1, (long)100L);
        ProcedureCoordinator coordinator = (ProcedureCoordinator)Mockito.spy((Object)new ProcedureCoordinator((ProcedureCoordinatorRpcs)coordinatorController, pool));
        SubprocedureFactory subprocFactory = (SubprocedureFactory)Mockito.mock(SubprocedureFactory.class);
        ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(expected.size());
        for (String member : expected) {
            ZKWatcher watcher = TestZKProcedure.newZooKeeperWatcher();
            ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
            ThreadPoolExecutor pool2 = ProcedureMember.defaultPool((String)member, (int)1, (long)100L);
            ProcedureMember mem = new ProcedureMember((ProcedureMemberRpcs)controller, pool2, subprocFactory);
            members.add(new Pair((Object)mem, (Object)controller));
            controller.start(member, mem);
        }
        final ArrayList<Subprocedure> cohortTasks = new ArrayList<Subprocedure>();
        final int[] elem = new int[1];
        for (int i = 0; i < members.size(); ++i) {
            ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
            final ProcedureMember comms = (ProcedureMember)((Pair)members.get(i)).getFirst();
            Subprocedure commit = (Subprocedure)Mockito.spy((Object)new Subprocedure.SubprocedureImpl(comms, opName, cohortMonitor, 500L, 10000L));
            ((Subprocedure)Mockito.doAnswer((Answer)new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    int index = elem[0];
                    if (index == 2) {
                        LOG.debug("Sending error to coordinator");
                        ForeignException remoteCause = new ForeignException("TIMER", (Throwable)new TimeoutException("subprocTimeout", 1L, 2L, 0L));
                        Subprocedure r = (Subprocedure)invocation.getMock();
                        LOG.error("Remote commit failure, not propagating error:" + remoteCause);
                        comms.receiveAbortProcedure(r.getName(), remoteCause);
                        Assert.assertTrue((boolean)r.isComplete());
                        try {
                            Procedure.waitForLatch((CountDownLatch)coordinatorReceivedErrorLatch, (ForeignExceptionSnare)new ForeignExceptionDispatcher(), (long)500L, (String)"coordinator received error");
                        }
                        catch (InterruptedException e) {
                            LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0L));
                            Thread.currentThread().interrupt();
                        }
                    }
                    elem[0] = ++index;
                    return null;
                }
            }).when((Object)commit)).acquireBarrier();
            cohortTasks.add(commit);
        }
        final AtomicInteger taskIndex = new AtomicInteger();
        Mockito.when((Object)subprocFactory.buildSubprocedure((String)Mockito.eq((Object)opName), (byte[])Mockito.argThat((ArgumentMatcher)new ArrayEquals((Object)data)))).thenAnswer((Answer)new Answer<Subprocedure>(){

            public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
                int index = taskIndex.getAndIncrement();
                Subprocedure commit = (Subprocedure)cohortTasks.get(index);
                return commit;
            }
        });
        ForeignExceptionDispatcher coordinatorTaskErrorMonitor = (ForeignExceptionDispatcher)Mockito.spy((Object)new ForeignExceptionDispatcher());
        Procedure coordinatorTask = (Procedure)Mockito.spy((Object)new Procedure(coordinator, coordinatorTaskErrorMonitor, 500L, 10000L, opName, data, (List)expected));
        Mockito.when((Object)coordinator.createProcedure((ForeignExceptionDispatcher)Matchers.any(), (String)Matchers.eq((Object)opName), (byte[])Matchers.eq((Object)data), Matchers.anyListOf(String.class))).thenReturn((Object)coordinatorTask);
        ((Procedure)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                invocation.callRealMethod();
                coordinatorReceivedErrorLatch.countDown();
                return null;
            }
        }).when((Object)coordinatorTask)).receive((ForeignException)((Object)Mockito.any()));
        Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, (List)expected);
        Assert.assertEquals((String)"Didn't mock coordinator task", (Object)coordinatorTask, (Object)task);
        try {
            task.waitForCompleted();
        }
        catch (ForeignException foreignException) {
            // empty catch block
        }
        this.waitAndVerifyProc(coordinatorTask, once, Mockito.never(), once, Mockito.atMost((int)1), true);
        this.verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, Mockito.never(), once, once, true);
        this.closeAll(coordinator, coordinatorController, members);
    }

    private void waitAndVerifyProc(Procedure proc, VerificationMode prepare, VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError) throws Exception {
        boolean caughtError = false;
        try {
            proc.waitForCompleted();
        }
        catch (ForeignException fe) {
            caughtError = true;
        }
        ((Procedure)Mockito.verify((Object)proc, (VerificationMode)prepare)).sendGlobalBarrierStart();
        ((Procedure)Mockito.verify((Object)proc, (VerificationMode)commit)).sendGlobalBarrierReached();
        ((Procedure)Mockito.verify((Object)proc, (VerificationMode)finish)).sendGlobalBarrierComplete();
        Assert.assertEquals((String)"Operation error state was unexpected", (Object)opHasError, (Object)proc.getErrorMonitor().hasException());
        Assert.assertEquals((String)"Operation error state was unexpected", (Object)opHasError, (Object)caughtError);
    }

    private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare, VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError) throws Exception {
        boolean caughtError = false;
        try {
            op.waitForLocallyCompleted();
        }
        catch (ForeignException fe) {
            caughtError = true;
        }
        ((Subprocedure)Mockito.verify((Object)op, (VerificationMode)prepare)).acquireBarrier();
        ((Subprocedure)Mockito.verify((Object)op, (VerificationMode)commit)).insideBarrier();
        Assert.assertEquals((String)"Operation error state was unexpected", (Object)opHasError, (Object)op.getErrorCheckable().hasException());
        Assert.assertEquals((String)"Operation error state was unexpected", (Object)opHasError, (Object)caughtError);
    }

    private void verifyCohortSuccessful(List<String> cohortNames, SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks, VerificationMode prepare, VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError) throws Exception {
        ((SubprocedureFactory)Mockito.verify((Object)subprocFactory, (VerificationMode)Mockito.times((int)cohortNames.size()))).buildSubprocedure((String)Mockito.eq((Object)opName), (byte[])Mockito.argThat((ArgumentMatcher)new ArrayEquals((Object)data)));
        int j = 0;
        for (Subprocedure op : cohortTasks) {
            LOG.debug("Checking mock:" + j++);
            this.waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
        }
    }

    private void closeAll(ProcedureCoordinator coordinator, ZKProcedureCoordinator coordinatorController, List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort) throws IOException {
        for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
            ((ProcedureMember)member.getFirst()).close();
            ((ZKProcedureMemberRpcs)member.getSecond()).close();
        }
        coordinator.close();
        coordinatorController.close();
    }
}

