package org.opendaylight.controller.cluster.datastore;

import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.util.Timeout;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import scala.concurrent.Promise;

/* loaded from: input_file:org/opendaylight/controller/cluster/datastore/TransactionProxyTest.class */
public class TransactionProxyTest extends AbstractTransactionProxyTest {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opendaylight/controller/cluster/datastore/TransactionProxyTest$Invoker.class */
    public interface Invoker {
        FluentFuture<?> invoke(TransactionProxy transactionProxy);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opendaylight/controller/cluster/datastore/TransactionProxyTest$TestException.class */
    public static class TestException extends RuntimeException {
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opendaylight/controller/cluster/datastore/TransactionProxyTest$TransactionProxyOperation.class */
    public interface TransactionProxyOperation {
        void run(TransactionProxy transactionProxy);
    }

    @Test
    public void testRead() throws Exception {
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_ONLY);
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_ONLY);
        ((ActorUtils) Mockito.doReturn(readDataReply(null)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
        Assert.assertEquals(Optional.empty(), transactionProxy.read(TestModel.TEST_PATH).get(5L, TimeUnit.SECONDS));
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        ((ActorUtils) Mockito.doReturn(readDataReply(containerNode)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
        Assert.assertEquals(Optional.of(containerNode), transactionProxy.read(TestModel.TEST_PATH).get(5L, TimeUnit.SECONDS));
    }

    @Test(expected = ReadFailedException.class)
    public void testReadWithInvalidReplyMessageType() throws Throwable {
        ((ActorUtils) Mockito.doReturn(Futures.successful(new Object())).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_ONLY))), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
        try {
            new TransactionProxy(this.mockComponentFactory, TransactionType.READ_ONLY).read(TestModel.TEST_PATH).get(5L, TimeUnit.SECONDS);
        } catch (ExecutionException e) {
            throw e.getCause();
        }
    }

    @Test(expected = TestException.class)
    public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
        ((ActorUtils) Mockito.doReturn(Futures.failed(new TestException())).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_ONLY))), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
        propagateReadFailedExceptionCause(new TransactionProxy(this.mockComponentFactory, TransactionType.READ_ONLY).read(TestModel.TEST_PATH));
    }

    private void testExceptionOnInitialCreateTransaction(Exception exc, Invoker invoker) throws Throwable {
        ActorRef actorOf = getSystem().actorOf(Props.create(DoNothingActor.class, new Object[0]));
        if (exc instanceof PrimaryNotFoundException) {
            ((ActorUtils) Mockito.doReturn(Futures.failed(exc)).when(this.mockActorContext)).findPrimaryShardAsync(ArgumentMatchers.anyString());
        } else {
            ((ActorUtils) Mockito.doReturn(primaryShardInfoReply(getSystem(), actorOf)).when(this.mockActorContext)).findPrimaryShardAsync(ArgumentMatchers.anyString());
        }
        ((ActorUtils) Mockito.doReturn(Futures.failed(exc)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.any(ActorSelection.class), ArgumentMatchers.any(), (Timeout) ArgumentMatchers.any(Timeout.class));
        propagateReadFailedExceptionCause(invoker.invoke(new TransactionProxy(this.mockComponentFactory, TransactionType.READ_ONLY)));
    }

    private void testReadWithExceptionOnInitialCreateTransaction(Exception exc) throws Throwable {
        testExceptionOnInitialCreateTransaction(exc, transactionProxy -> {
            return transactionProxy.read(TestModel.TEST_PATH);
        });
    }

    @Test(expected = PrimaryNotFoundException.class)
    public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
        testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
    }

    @Test(expected = TestException.class)
    public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
        testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test", new TestException()));
    }

    @Test(expected = TestException.class)
    public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
        testReadWithExceptionOnInitialCreateTransaction(new TestException());
    }

    @Test
    public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_WRITE);
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        expectBatchedModifications(actorRef, 1);
        ((ActorUtils) Mockito.doReturn(readDataReply(containerNode)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        transactionProxy.write(TestModel.TEST_PATH, containerNode);
        Assert.assertEquals(Optional.of(containerNode), transactionProxy.read(TestModel.TEST_PATH).get(5L, TimeUnit.SECONDS));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.mockActorContext});
        ((ActorUtils) inOrder.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), ArgumentMatchers.isA(BatchedModifications.class), (Timeout) ArgumentMatchers.any(Timeout.class));
        ((ActorUtils) inOrder.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
    }

    @Test
    public void testReadPreConditionCheck() {
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY);
        Assert.assertThrows(IllegalStateException.class, () -> {
            transactionProxy.read(TestModel.TEST_PATH);
        });
    }

    @Test(expected = IllegalArgumentException.class)
    public void testInvalidCreateTransactionReply() throws Throwable {
        ActorRef actorOf = getSystem().actorOf(Props.create(DoNothingActor.class, new Object[0]));
        ((ActorUtils) Mockito.doReturn(getSystem().actorSelection(actorOf.path())).when(this.mockActorContext)).actorSelection(actorOf.path().toString());
        ((ActorUtils) Mockito.doReturn(primaryShardInfoReply(getSystem(), actorOf)).when(this.mockActorContext)).findPrimaryShardAsync((String) ArgumentMatchers.eq("default"));
        ((ActorUtils) Mockito.doReturn(Futures.successful(new Object())).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(getSystem().actorSelection(actorOf.path())), eqCreateTransaction("mock-member", TransactionType.READ_ONLY), (Timeout) ArgumentMatchers.any(Timeout.class));
        propagateReadFailedExceptionCause(new TransactionProxy(this.mockComponentFactory, TransactionType.READ_ONLY).read(TestModel.TEST_PATH));
    }

    @Test
    public void testExists() throws Exception {
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_ONLY);
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_ONLY);
        ((ActorUtils) Mockito.doReturn(dataExistsReply(false)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqDataExists(), (Timeout) ArgumentMatchers.any(Timeout.class));
        Assert.assertEquals("Exists response", Boolean.FALSE, (Boolean) transactionProxy.exists(TestModel.TEST_PATH).get());
        ((ActorUtils) Mockito.doReturn(dataExistsReply(true)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqDataExists(), (Timeout) ArgumentMatchers.any(Timeout.class));
        Assert.assertEquals("Exists response", Boolean.TRUE, (Boolean) transactionProxy.exists(TestModel.TEST_PATH).get());
    }

    @Test(expected = PrimaryNotFoundException.class)
    public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
        testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), transactionProxy -> {
            return transactionProxy.exists(TestModel.TEST_PATH);
        });
    }

    @Test(expected = ReadFailedException.class)
    public void testExistsWithInvalidReplyMessageType() throws Throwable {
        ((ActorUtils) Mockito.doReturn(Futures.successful(new Object())).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_ONLY))), eqDataExists(), (Timeout) ArgumentMatchers.any(Timeout.class));
        try {
            new TransactionProxy(this.mockComponentFactory, TransactionType.READ_ONLY).exists(TestModel.TEST_PATH).get(5L, TimeUnit.SECONDS);
        } catch (ExecutionException e) {
            throw e.getCause();
        }
    }

    @Test(expected = TestException.class)
    public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
        ((ActorUtils) Mockito.doReturn(Futures.failed(new TestException())).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_ONLY))), eqDataExists(), (Timeout) ArgumentMatchers.any(Timeout.class));
        propagateReadFailedExceptionCause(new TransactionProxy(this.mockComponentFactory, TransactionType.READ_ONLY).exists(TestModel.TEST_PATH));
    }

    @Test
    public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_WRITE);
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        expectBatchedModifications(actorRef, 1);
        ((ActorUtils) Mockito.doReturn(dataExistsReply(true)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqDataExists(), (Timeout) ArgumentMatchers.any(Timeout.class));
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        transactionProxy.write(TestModel.TEST_PATH, containerNode);
        Assert.assertEquals("Exists response", Boolean.TRUE, (Boolean) transactionProxy.exists(TestModel.TEST_PATH).get());
        InOrder inOrder = Mockito.inOrder(new Object[]{this.mockActorContext});
        ((ActorUtils) inOrder.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), ArgumentMatchers.isA(BatchedModifications.class), (Timeout) ArgumentMatchers.any(Timeout.class));
        ((ActorUtils) inOrder.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqDataExists(), (Timeout) ArgumentMatchers.any(Timeout.class));
    }

    @Test(expected = IllegalStateException.class)
    public void testExistsPreConditionCheck() {
        new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY).exists(TestModel.TEST_PATH);
    }

    @Test
    public void testWrite() {
        this.dataStoreContextBuilder.shardBatchedModificationCount(1);
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.WRITE_ONLY);
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        expectBatchedModifications(actorRef, 1);
        new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY).write(TestModel.TEST_PATH, containerNode);
        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, containerNode), false);
    }

    @Test
    public void testWriteAfterAsyncRead() throws Exception {
        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), "default");
        Promise promise = Futures.promise();
        ((ActorUtils) Mockito.doReturn(promise).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction("mock-member", TransactionType.READ_WRITE), (Timeout) ArgumentMatchers.any(Timeout.class));
        ((ActorUtils) Mockito.doReturn(readDataReply(null)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
        expectBatchedModificationsReady(actorRef);
        final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        final TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicReference atomicReference = new AtomicReference();
        com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH), new FutureCallback<Optional<NormalizedNode>>() { // from class: org.opendaylight.controller.cluster.datastore.TransactionProxyTest.1
            public void onSuccess(Optional<NormalizedNode> optional) {
                try {
                    transactionProxy.write(TestModel.TEST_PATH, containerNode);
                } catch (Exception e) {
                    atomicReference.set(e);
                } finally {
                    countDownLatch.countDown();
                }
            }

            public void onFailure(Throwable th) {
                atomicReference.set(th);
                countDownLatch.countDown();
            }
        }, MoreExecutors.directExecutor());
        promise.success(createTransactionReply(actorRef, (short) 12));
        Uninterruptibles.awaitUninterruptibly(countDownLatch, 5L, TimeUnit.SECONDS);
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            Throwables.propagateIfPossible(th, Exception.class);
            throw new RuntimeException(th);
        }
        transactionProxy.ready();
        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, containerNode), true);
    }

    @Test(expected = IllegalStateException.class)
    public void testWritePreConditionCheck() {
        new TransactionProxy(this.mockComponentFactory, TransactionType.READ_ONLY).write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
    }

    @Test(expected = IllegalStateException.class)
    public void testWriteAfterReadyPreConditionCheck() {
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY);
        transactionProxy.ready();
        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
    }

    @Test
    public void testMerge() {
        this.dataStoreContextBuilder.shardBatchedModificationCount(1);
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.WRITE_ONLY);
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        expectBatchedModifications(actorRef, 1);
        new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY).merge(TestModel.TEST_PATH, containerNode);
        verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, containerNode), false);
    }

    @Test
    public void testDelete() {
        this.dataStoreContextBuilder.shardBatchedModificationCount(1);
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.WRITE_ONLY);
        expectBatchedModifications(actorRef, 1);
        new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY).delete(TestModel.TEST_PATH);
        verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
    }

    @Test
    public void testReadWrite() {
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_WRITE);
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        ((ActorUtils) Mockito.doReturn(readDataReply(null)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
        expectBatchedModifications(actorRef, 1);
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        transactionProxy.read(TestModel.TEST_PATH);
        transactionProxy.write(TestModel.TEST_PATH, containerNode);
        transactionProxy.read(TestModel.TEST_PATH);
        transactionProxy.read(TestModel.TEST_PATH);
        List<BatchedModifications> captureBatchedModifications = captureBatchedModifications(actorRef);
        Assert.assertEquals("Captured BatchedModifications count", 1L, captureBatchedModifications.size());
        verifyBatchedModifications(captureBatchedModifications.get(0), false, new WriteModification(TestModel.TEST_PATH, containerNode));
    }

    @Test
    public void testReadyWithReadWrite() {
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_WRITE);
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        ((ActorUtils) Mockito.doReturn(readDataReply(null)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
        expectBatchedModificationsReady(actorRef, true);
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        transactionProxy.read(TestModel.TEST_PATH);
        transactionProxy.write(TestModel.TEST_PATH, containerNode);
        SingleCommitCohortProxy ready = transactionProxy.ready();
        Assert.assertTrue(ready instanceof SingleCommitCohortProxy);
        verifyCohortFutures(ready, new CommitTransactionReply().toSerializable());
        List<BatchedModifications> captureBatchedModifications = captureBatchedModifications(actorRef);
        Assert.assertEquals("Captured BatchedModifications count", 1L, captureBatchedModifications.size());
        verifyBatchedModifications(captureBatchedModifications.get(0), true, true, new WriteModification(TestModel.TEST_PATH, containerNode));
        Assert.assertEquals("getTotalMessageCount", 1L, captureBatchedModifications.get(0).getTotalMessagesSent());
    }

    @Test
    public void testReadyWithNoModifications() {
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_WRITE);
        ((ActorUtils) Mockito.doReturn(readDataReply(null)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
        expectBatchedModificationsReady(actorRef, true);
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        transactionProxy.read(TestModel.TEST_PATH);
        SingleCommitCohortProxy ready = transactionProxy.ready();
        Assert.assertTrue(ready instanceof SingleCommitCohortProxy);
        verifyCohortFutures(ready, new CommitTransactionReply().toSerializable());
        List<BatchedModifications> captureBatchedModifications = captureBatchedModifications(actorRef);
        Assert.assertEquals("Captured BatchedModifications count", 1L, captureBatchedModifications.size());
        verifyBatchedModifications(captureBatchedModifications.get(0), true, true, new Modification[0]);
    }

    @Test
    public void testReadyWithMultipleShardWrites() {
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.WRITE_ONLY);
        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.WRITE_ONLY, TestModel.JUNK_QNAME.getLocalName());
        expectBatchedModificationsReady(actorRef);
        expectBatchedModificationsReady(actorRef2);
        ActorRef actorOf = getSystem().actorOf(Props.create(DoNothingActor.class, new Object[0]));
        ((ActorUtils) Mockito.doReturn(getSystem().actorSelection(actorOf.path())).when(this.mockActorContext)).actorSelection(actorOf.path().toString());
        ((ActorUtils) Mockito.doReturn(Futures.successful(newPrimaryShardInfo(actorOf, createDataTree()))).when(this.mockActorContext)).findPrimaryShardAsync((String) ArgumentMatchers.eq(CarsModel.BASE_QNAME.getLocalName()));
        expectReadyLocalTransaction(actorOf, false);
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY);
        transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
        transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
        ThreePhaseCommitCohortProxy ready = transactionProxy.ready();
        Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
        verifyCohortFutures(ready, actorSelection(actorRef), actorSelection(actorRef2), actorSelection(actorOf));
        ImmutableSortedSet of = ImmutableSortedSet.of("default", TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BatchedModifications.class);
        ((ActorUtils) Mockito.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), forClass.capture(), (Timeout) ArgumentMatchers.any(Timeout.class));
        Assert.assertEquals("Participating shards", Optional.of(of), ((BatchedModifications) forClass.getValue()).getParticipatingShardNames());
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(BatchedModifications.class);
        ((ActorUtils) Mockito.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef2)), forClass2.capture(), (Timeout) ArgumentMatchers.any(Timeout.class));
        Assert.assertEquals("Participating shards", Optional.of(of), ((BatchedModifications) forClass2.getValue()).getParticipatingShardNames());
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
        ((ActorUtils) Mockito.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorOf)), forClass3.capture(), (Timeout) ArgumentMatchers.any(Timeout.class));
        Assert.assertEquals("Participating shards", Optional.of(of), ((ReadyLocalTransaction) forClass3.getValue()).getParticipatingShardNames());
    }

    @Test
    public void testReadyWithWriteOnlyAndLastBatchPending() {
        this.dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.WRITE_ONLY);
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        expectBatchedModificationsReady(actorRef, true);
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY);
        transactionProxy.write(TestModel.TEST_PATH, containerNode);
        SingleCommitCohortProxy ready = transactionProxy.ready();
        Assert.assertTrue(ready instanceof SingleCommitCohortProxy);
        verifyCohortFutures(ready, new CommitTransactionReply().toSerializable());
        List<BatchedModifications> captureBatchedModifications = captureBatchedModifications(actorRef);
        Assert.assertEquals("Captured BatchedModifications count", 1L, captureBatchedModifications.size());
        verifyBatchedModifications(captureBatchedModifications.get(0), true, true, new WriteModification(TestModel.TEST_PATH, containerNode));
    }

    @Test
    public void testReadyWithWriteOnlyAndLastBatchEmpty() {
        this.dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.WRITE_ONLY);
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        expectBatchedModificationsReady(actorRef, true);
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY);
        transactionProxy.write(TestModel.TEST_PATH, containerNode);
        SingleCommitCohortProxy ready = transactionProxy.ready();
        Assert.assertTrue(ready instanceof SingleCommitCohortProxy);
        verifyCohortFutures(ready, new CommitTransactionReply().toSerializable());
        List<BatchedModifications> captureBatchedModifications = captureBatchedModifications(actorRef);
        Assert.assertEquals("Captured BatchedModifications count", 2L, captureBatchedModifications.size());
        verifyBatchedModifications(captureBatchedModifications.get(0), false, new WriteModification(TestModel.TEST_PATH, containerNode));
        verifyBatchedModifications(captureBatchedModifications.get(1), true, true, new Modification[0]);
    }

    @Test
    public void testReadyWithReplyFailure() {
        this.dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.WRITE_ONLY);
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        expectFailedBatchedModifications(actorRef);
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY);
        transactionProxy.merge(TestModel.TEST_PATH, containerNode);
        SingleCommitCohortProxy ready = transactionProxy.ready();
        Assert.assertTrue(ready instanceof SingleCommitCohortProxy);
        verifyCohortFutures(ready, TestException.class);
    }

    @Test
    public void testReadyWithDebugContextEnabled() {
        this.dataStoreContextBuilder.transactionDebugContextEnabled(true);
        expectBatchedModificationsReady(setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_WRITE), true);
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
        DebugThreePhaseCommitCohort ready = transactionProxy.ready();
        Assert.assertTrue(ready instanceof DebugThreePhaseCommitCohort);
        verifyCohortFutures(ready, new CommitTransactionReply().toSerializable());
    }

    @Test
    public void testReadyWithLocalTransaction() {
        ActorRef actorOf = getSystem().actorOf(Props.create(DoNothingActor.class, new Object[0]));
        ((ActorUtils) Mockito.doReturn(getSystem().actorSelection(actorOf.path())).when(this.mockActorContext)).actorSelection(actorOf.path().toString());
        ((ActorUtils) Mockito.doReturn(Futures.successful(newPrimaryShardInfo(actorOf, createDataTree()))).when(this.mockActorContext)).findPrimaryShardAsync((String) ArgumentMatchers.eq("default"));
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY);
        expectReadyLocalTransaction(actorOf, true);
        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
        SingleCommitCohortProxy ready = transactionProxy.ready();
        Assert.assertTrue(ready instanceof SingleCommitCohortProxy);
        verifyCohortFutures(ready, new CommitTransactionReply().toSerializable());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
        ((ActorUtils) Mockito.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorOf)), forClass.capture(), (Timeout) ArgumentMatchers.any(Timeout.class));
        Assert.assertFalse("Participating shards present", ((ReadyLocalTransaction) forClass.getValue()).getParticipatingShardNames().isPresent());
    }

    @Test
    public void testReadyWithLocalTransactionWithFailure() {
        ActorRef actorOf = getSystem().actorOf(Props.create(DoNothingActor.class, new Object[0]));
        ((ActorUtils) Mockito.doReturn(getSystem().actorSelection(actorOf.path())).when(this.mockActorContext)).actorSelection(actorOf.path().toString());
        DataTree createDataTree = createDataTree();
        ((DataTreeModification) Mockito.doThrow(new Throwable[]{new RuntimeException("mock")}).when(createDataTree.takeSnapshot().newModification())).ready();
        ((ActorUtils) Mockito.doReturn(Futures.successful(newPrimaryShardInfo(actorOf, createDataTree))).when(this.mockActorContext)).findPrimaryShardAsync((String) ArgumentMatchers.eq("default"));
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY);
        expectReadyLocalTransaction(actorOf, true);
        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
        SingleCommitCohortProxy ready = transactionProxy.ready();
        Assert.assertTrue(ready instanceof SingleCommitCohortProxy);
        verifyCohortFutures(ready, RuntimeException.class);
    }

    private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception exc) {
        ((ActorUtils) Mockito.doReturn(Futures.failed(exc)).when(this.mockActorContext)).findPrimaryShardAsync(ArgumentMatchers.anyString());
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY);
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        transactionProxy.merge(TestModel.TEST_PATH, containerNode);
        transactionProxy.write(TestModel.TEST_PATH, containerNode);
        transactionProxy.delete(TestModel.TEST_PATH);
        SingleCommitCohortProxy ready = transactionProxy.ready();
        Assert.assertTrue(ready instanceof SingleCommitCohortProxy);
        verifyCohortFutures(ready, exc.getClass());
    }

    @Test
    public void testWriteOnlyTxWithPrimaryNotFoundException() {
        testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
    }

    @Test
    public void testWriteOnlyTxWithNotInitializedException() {
        testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
    }

    @Test
    public void testWriteOnlyTxWithNoShardLeaderException() {
        testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
    }

    @Test
    public void testReadyWithInvalidReplyMessageType() {
        this.dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.WRITE_ONLY);
        ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.WRITE_ONLY, TestModel.JUNK_QNAME.getLocalName());
        ((ActorUtils) Mockito.doReturn(Futures.successful(new Object())).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), ArgumentMatchers.isA(BatchedModifications.class), (Timeout) ArgumentMatchers.any(Timeout.class));
        expectBatchedModificationsReady(actorRef2);
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.WRITE_ONLY);
        transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
        transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
        ThreePhaseCommitCohortProxy ready = transactionProxy.ready();
        Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
        verifyCohortFutures(ready, actorSelection(actorRef2), IllegalArgumentException.class);
    }

    @Test
    public void testGetIdentifier() {
        setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_ONLY);
        Object identifier = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_ONLY).getIdentifier();
        Assert.assertNotNull("getIdentifier returned null", identifier);
        Assert.assertTrue("Invalid identifier: " + identifier, identifier.toString().contains("mock-member"));
    }

    @Test
    public void testClose() {
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_WRITE);
        ((ActorUtils) Mockito.doReturn(readDataReply(null)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        transactionProxy.read(TestModel.TEST_PATH);
        transactionProxy.close();
        ((ActorUtils) Mockito.verify(this.mockActorContext)).sendOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), ArgumentMatchers.isA(CloseTransaction.class));
    }

    private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef) {
        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), (short) 12);
    }

    private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, DataTree dataTree) {
        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), (short) 12, dataTree);
    }

    private void throttleOperation(TransactionProxyOperation transactionProxyOperation) {
        throttleOperation(transactionProxyOperation, 1, true);
    }

    private void throttleOperation(TransactionProxyOperation transactionProxyOperation, int i, boolean z) {
        throttleOperation(transactionProxyOperation, i, z, TimeUnit.MILLISECONDS.toNanos(this.mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
    }

    private void throttleOperation(TransactionProxyOperation transactionProxyOperation, int i, boolean z, long j) {
        ActorSystem system = getSystem();
        ActorRef actorOf = system.actorOf(Props.create(DoNothingActor.class, new Object[0]));
        ((ActorUtils) Mockito.doReturn(this.dataStoreContextBuilder.operationTimeoutInSeconds(2).shardBatchedModificationCount(i - 1).build()).when(this.mockActorContext)).getDatastoreContext();
        ((ActorUtils) Mockito.doReturn(system.actorSelection(actorOf.path())).when(this.mockActorContext)).actorSelection(actorOf.path().toString());
        if (z) {
            ((ActorUtils) Mockito.doReturn(Futures.successful(newPrimaryShardInfo(actorOf))).when(this.mockActorContext)).findPrimaryShardAsync((String) ArgumentMatchers.eq("default"));
            ((ActorUtils) Mockito.doReturn(Futures.successful(newPrimaryShardInfo(actorOf))).when(this.mockActorContext)).findPrimaryShardAsync((String) ArgumentMatchers.eq("cars"));
        } else {
            ((ActorUtils) Mockito.doReturn(Futures.failed(new Exception("not found"))).when(this.mockActorContext)).findPrimaryShardAsync((String) ArgumentMatchers.eq("default"));
        }
        ((ActorUtils) Mockito.doReturn(incompleteFuture()).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(system.actorSelection(actorOf.path())), eqCreateTransaction("mock-member", TransactionType.READ_WRITE), (Timeout) ArgumentMatchers.any(Timeout.class));
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        long nanoTime = System.nanoTime();
        transactionProxyOperation.run(transactionProxy);
        long nanoTime2 = System.nanoTime();
        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", Long.valueOf(j), Long.valueOf(nanoTime2 - nanoTime)), nanoTime2 - nanoTime > j && nanoTime2 - nanoTime < j * 2);
    }

    private void completeOperation(TransactionProxyOperation transactionProxyOperation) {
        completeOperation(transactionProxyOperation, true);
    }

    private void completeOperation(TransactionProxyOperation transactionProxyOperation, boolean z) {
        ActorSystem system = getSystem();
        ActorRef actorOf = system.actorOf(Props.create(DoNothingActor.class, new Object[0]));
        ((ActorUtils) Mockito.doReturn(system.actorSelection(actorOf.path())).when(this.mockActorContext)).actorSelection(actorOf.path().toString());
        if (z) {
            ((ActorUtils) Mockito.doReturn(Futures.successful(newPrimaryShardInfo(actorOf))).when(this.mockActorContext)).findPrimaryShardAsync((String) ArgumentMatchers.eq("default"));
        } else {
            ((ActorUtils) Mockito.doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(this.mockActorContext)).findPrimaryShardAsync((String) ArgumentMatchers.eq("default"));
        }
        String obj = system.actorOf(Props.create(DoNothingActor.class, new Object[0])).path().toString();
        CreateTransactionReply createTransactionReply = new CreateTransactionReply(obj, nextTransactionId(), (short) 12);
        ((ActorUtils) Mockito.doReturn(system.actorSelection(obj)).when(this.mockActorContext)).actorSelection(obj);
        ((ActorUtils) Mockito.doReturn(Futures.successful(createTransactionReply)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(system.actorSelection(actorOf.path())), eqCreateTransaction("mock-member", TransactionType.READ_WRITE), (Timeout) ArgumentMatchers.any(Timeout.class));
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        long nanoTime = System.nanoTime();
        transactionProxyOperation.run(transactionProxy);
        long nanoTime2 = System.nanoTime();
        long nanos = TimeUnit.MILLISECONDS.toNanos(this.mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", Long.valueOf(nanos), Long.valueOf(nanoTime2 - nanoTime)), nanoTime2 - nanoTime <= nanos);
    }

    private void completeOperationLocal(TransactionProxyOperation transactionProxyOperation, DataTree dataTree) {
        ActorSystem system = getSystem();
        ActorRef actorOf = system.actorOf(Props.create(DoNothingActor.class, new Object[0]));
        ((ActorUtils) Mockito.doReturn(system.actorSelection(actorOf.path())).when(this.mockActorContext)).actorSelection(actorOf.path().toString());
        ((ActorUtils) Mockito.doReturn(Futures.successful(newPrimaryShardInfo(actorOf, dataTree))).when(this.mockActorContext)).findPrimaryShardAsync((String) ArgumentMatchers.eq("default"));
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        long nanoTime = System.nanoTime();
        transactionProxyOperation.run(transactionProxy);
        long nanoTime2 = System.nanoTime();
        long nanos = TimeUnit.MILLISECONDS.toNanos(this.mockActorContext.getDatastoreContext().getOperationTimeoutInMillis());
        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", Long.valueOf(nanos), Long.valueOf(nanoTime2 - nanoTime)), nanoTime2 - nanoTime <= nanos);
    }

    private static DataTree createDataTree() {
        DataTree dataTree = (DataTree) Mockito.mock(DataTree.class);
        DataTreeSnapshot dataTreeSnapshot = (DataTreeSnapshot) Mockito.mock(DataTreeSnapshot.class);
        DataTreeModification dataTreeModification = (DataTreeModification) Mockito.mock(DataTreeModification.class);
        ((DataTree) Mockito.doReturn(dataTreeSnapshot).when(dataTree)).takeSnapshot();
        ((DataTreeSnapshot) Mockito.doReturn(dataTreeModification).when(dataTreeSnapshot)).newModification();
        return dataTree;
    }

    private static DataTree createDataTree(NormalizedNode normalizedNode) {
        DataTree dataTree = (DataTree) Mockito.mock(DataTree.class);
        DataTreeSnapshot dataTreeSnapshot = (DataTreeSnapshot) Mockito.mock(DataTreeSnapshot.class);
        DataTreeModification dataTreeModification = (DataTreeModification) Mockito.mock(DataTreeModification.class);
        ((DataTree) Mockito.doReturn(dataTreeSnapshot).when(dataTree)).takeSnapshot();
        ((DataTreeSnapshot) Mockito.doReturn(dataTreeModification).when(dataTreeSnapshot)).newModification();
        ((DataTreeModification) Mockito.doReturn(Optional.of(normalizedNode)).when(dataTreeModification)).readNode((YangInstanceIdentifier) ArgumentMatchers.any(YangInstanceIdentifier.class));
        return dataTree;
    }

    @Test
    public void testWriteCompletionForLocalShard() {
        completeOperationLocal(transactionProxy -> {
            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
            transactionProxy.write(TestModel.TEST_PATH, containerNode);
            transactionProxy.write(TestModel.TEST_PATH, containerNode);
        }, createDataTree());
    }

    @Test
    public void testWriteThrottlingWhenShardFound() {
        throttleOperation(transactionProxy -> {
            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
            expectIncompleteBatchedModifications();
            transactionProxy.write(TestModel.TEST_PATH, containerNode);
            transactionProxy.write(TestModel.TEST_PATH, containerNode);
        });
    }

    @Test
    public void testWriteThrottlingWhenShardNotFound() {
        completeOperation(transactionProxy -> {
            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
            expectBatchedModifications(2);
            transactionProxy.write(TestModel.TEST_PATH, containerNode);
            transactionProxy.write(TestModel.TEST_PATH, containerNode);
        }, false);
    }

    @Test
    public void testWriteCompletion() {
        completeOperation(transactionProxy -> {
            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
            expectBatchedModifications(2);
            transactionProxy.write(TestModel.TEST_PATH, containerNode);
            transactionProxy.write(TestModel.TEST_PATH, containerNode);
        });
    }

    @Test
    public void testMergeThrottlingWhenShardFound() {
        throttleOperation(transactionProxy -> {
            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
            expectIncompleteBatchedModifications();
            transactionProxy.merge(TestModel.TEST_PATH, containerNode);
            transactionProxy.merge(TestModel.TEST_PATH, containerNode);
        });
    }

    @Test
    public void testMergeThrottlingWhenShardNotFound() {
        completeOperation(transactionProxy -> {
            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
            expectBatchedModifications(2);
            transactionProxy.merge(TestModel.TEST_PATH, containerNode);
            transactionProxy.merge(TestModel.TEST_PATH, containerNode);
        }, false);
    }

    @Test
    public void testMergeCompletion() {
        completeOperation(transactionProxy -> {
            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
            expectBatchedModifications(2);
            transactionProxy.merge(TestModel.TEST_PATH, containerNode);
            transactionProxy.merge(TestModel.TEST_PATH, containerNode);
        });
    }

    @Test
    public void testMergeCompletionForLocalShard() {
        completeOperationLocal(transactionProxy -> {
            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
            transactionProxy.merge(TestModel.TEST_PATH, containerNode);
            transactionProxy.merge(TestModel.TEST_PATH, containerNode);
        }, createDataTree());
    }

    @Test
    public void testDeleteThrottlingWhenShardFound() {
        throttleOperation(transactionProxy -> {
            expectIncompleteBatchedModifications();
            transactionProxy.delete(TestModel.TEST_PATH);
            transactionProxy.delete(TestModel.TEST_PATH);
        });
    }

    @Test
    public void testDeleteThrottlingWhenShardNotFound() {
        completeOperation(transactionProxy -> {
            expectBatchedModifications(2);
            transactionProxy.delete(TestModel.TEST_PATH);
            transactionProxy.delete(TestModel.TEST_PATH);
        }, false);
    }

    @Test
    public void testDeleteCompletionForLocalShard() {
        completeOperationLocal(transactionProxy -> {
            transactionProxy.delete(TestModel.TEST_PATH);
            transactionProxy.delete(TestModel.TEST_PATH);
        }, createDataTree());
    }

    @Test
    public void testDeleteCompletion() {
        completeOperation(transactionProxy -> {
            expectBatchedModifications(2);
            transactionProxy.delete(TestModel.TEST_PATH);
            transactionProxy.delete(TestModel.TEST_PATH);
        });
    }

    @Test
    public void testReadThrottlingWhenShardFound() {
        throttleOperation(transactionProxy -> {
            ((ActorUtils) Mockito.doReturn(incompleteFuture()).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.any(ActorSelection.class), eqReadData());
            transactionProxy.read(TestModel.TEST_PATH);
            transactionProxy.read(TestModel.TEST_PATH);
        });
    }

    @Test
    public void testReadThrottlingWhenShardNotFound() {
        completeOperation(transactionProxy -> {
            ((ActorUtils) Mockito.doReturn(incompleteFuture()).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.any(ActorSelection.class), eqReadData());
            transactionProxy.read(TestModel.TEST_PATH);
            transactionProxy.read(TestModel.TEST_PATH);
        }, false);
    }

    @Test
    public void testReadCompletion() {
        completeOperation(transactionProxy -> {
            ((ActorUtils) Mockito.doReturn(readDataReply(ImmutableNodes.containerNode(TestModel.TEST_QNAME))).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.any(ActorSelection.class), eqReadData(), (Timeout) ArgumentMatchers.any(Timeout.class));
            transactionProxy.read(TestModel.TEST_PATH);
            transactionProxy.read(TestModel.TEST_PATH);
        });
    }

    @Test
    public void testReadCompletionForLocalShard() {
        completeOperationLocal(transactionProxy -> {
            transactionProxy.read(TestModel.TEST_PATH);
            transactionProxy.read(TestModel.TEST_PATH);
        }, createDataTree(ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
    }

    @Test
    public void testReadCompletionForLocalShardWhenExceptionOccurs() {
        completeOperationLocal(transactionProxy -> {
            transactionProxy.read(TestModel.TEST_PATH);
            transactionProxy.read(TestModel.TEST_PATH);
        }, createDataTree());
    }

    @Test
    public void testExistsThrottlingWhenShardFound() {
        throttleOperation(transactionProxy -> {
            ((ActorUtils) Mockito.doReturn(incompleteFuture()).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.any(ActorSelection.class), eqDataExists());
            transactionProxy.exists(TestModel.TEST_PATH);
            transactionProxy.exists(TestModel.TEST_PATH);
        });
    }

    @Test
    public void testExistsThrottlingWhenShardNotFound() {
        completeOperation(transactionProxy -> {
            ((ActorUtils) Mockito.doReturn(incompleteFuture()).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.any(ActorSelection.class), eqDataExists());
            transactionProxy.exists(TestModel.TEST_PATH);
            transactionProxy.exists(TestModel.TEST_PATH);
        }, false);
    }

    @Test
    public void testExistsCompletion() {
        completeOperation(transactionProxy -> {
            ((ActorUtils) Mockito.doReturn(dataExistsReply(true)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.any(ActorSelection.class), eqDataExists(), (Timeout) ArgumentMatchers.any(Timeout.class));
            transactionProxy.exists(TestModel.TEST_PATH);
            transactionProxy.exists(TestModel.TEST_PATH);
        });
    }

    @Test
    public void testExistsCompletionForLocalShard() {
        completeOperationLocal(transactionProxy -> {
            transactionProxy.exists(TestModel.TEST_PATH);
            transactionProxy.exists(TestModel.TEST_PATH);
        }, createDataTree(ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
    }

    @Test
    public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
        completeOperationLocal(transactionProxy -> {
            transactionProxy.exists(TestModel.TEST_PATH);
            transactionProxy.exists(TestModel.TEST_PATH);
        }, createDataTree());
    }

    @Test
    public void testReadyThrottling() {
        throttleOperation(transactionProxy -> {
            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
            expectBatchedModifications(1);
            transactionProxy.write(TestModel.TEST_PATH, containerNode);
            transactionProxy.ready();
        });
    }

    @Test
    public void testReadyThrottlingWithTwoTransactionContexts() {
        throttleOperation(transactionProxy -> {
            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
            ContainerNode containerNode2 = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
            expectBatchedModifications(2);
            transactionProxy.write(TestModel.TEST_PATH, containerNode);
            transactionProxy.write(CarsModel.BASE_PATH, containerNode2);
            transactionProxy.ready();
        }, 1, true, TimeUnit.MILLISECONDS.toNanos(this.mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()) * 2);
    }

    private void testModificationOperationBatching(TransactionType transactionType) {
        this.dataStoreContextBuilder.shardBatchedModificationCount(3);
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), transactionType);
        expectBatchedModifications(actorRef, 3);
        YangInstanceIdentifier yangInstanceIdentifier = TestModel.TEST_PATH;
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        YangInstanceIdentifier yangInstanceIdentifier2 = TestModel.OUTER_LIST_PATH;
        ContainerNode containerNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
        YangInstanceIdentifier yangInstanceIdentifier3 = TestModel.INNER_LIST_PATH;
        ContainerNode containerNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
        YangInstanceIdentifier yangInstanceIdentifier4 = TestModel.TEST_PATH;
        ContainerNode containerNode4 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        YangInstanceIdentifier yangInstanceIdentifier5 = TestModel.OUTER_LIST_PATH;
        ContainerNode containerNode5 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
        YangInstanceIdentifier yangInstanceIdentifier6 = TestModel.INNER_LIST_PATH;
        ContainerNode containerNode6 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
        YangInstanceIdentifier yangInstanceIdentifier7 = TestModel.TEST_PATH;
        YangInstanceIdentifier yangInstanceIdentifier8 = TestModel.OUTER_LIST_PATH;
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, transactionType);
        transactionProxy.write(yangInstanceIdentifier, containerNode);
        transactionProxy.write(yangInstanceIdentifier2, containerNode2);
        transactionProxy.delete(yangInstanceIdentifier7);
        transactionProxy.merge(yangInstanceIdentifier4, containerNode4);
        transactionProxy.merge(yangInstanceIdentifier5, containerNode5);
        transactionProxy.write(yangInstanceIdentifier3, containerNode3);
        transactionProxy.merge(yangInstanceIdentifier6, containerNode6);
        transactionProxy.delete(yangInstanceIdentifier8);
        transactionProxy.ready();
        List<BatchedModifications> captureBatchedModifications = captureBatchedModifications(actorRef);
        Assert.assertEquals("Captured BatchedModifications count", 3L, captureBatchedModifications.size());
        verifyBatchedModifications(captureBatchedModifications.get(0), false, new WriteModification(yangInstanceIdentifier, containerNode), new WriteModification(yangInstanceIdentifier2, containerNode2), new DeleteModification(yangInstanceIdentifier7));
        verifyBatchedModifications(captureBatchedModifications.get(1), false, new MergeModification(yangInstanceIdentifier4, containerNode4), new MergeModification(yangInstanceIdentifier5, containerNode5), new WriteModification(yangInstanceIdentifier3, containerNode3));
        verifyBatchedModifications(captureBatchedModifications.get(2), true, true, new MergeModification(yangInstanceIdentifier6, containerNode6), new DeleteModification(yangInstanceIdentifier8));
        Assert.assertEquals("getTotalMessageCount", 3L, captureBatchedModifications.get(2).getTotalMessagesSent());
    }

    @Test
    public void testReadWriteModificationOperationBatching() {
        testModificationOperationBatching(TransactionType.READ_WRITE);
    }

    @Test
    public void testWriteOnlyModificationOperationBatching() {
        testModificationOperationBatching(TransactionType.WRITE_ONLY);
    }

    @Test
    public void testOptimizedWriteOnlyModificationOperationBatching() {
        this.dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
        testModificationOperationBatching(TransactionType.WRITE_ONLY);
    }

    @Test
    public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
        this.dataStoreContextBuilder.shardBatchedModificationCount(10);
        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), TransactionType.READ_WRITE);
        expectBatchedModifications(actorRef, 10);
        YangInstanceIdentifier yangInstanceIdentifier = TestModel.TEST_PATH;
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        YangInstanceIdentifier yangInstanceIdentifier2 = TestModel.OUTER_LIST_PATH;
        ContainerNode containerNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
        YangInstanceIdentifier yangInstanceIdentifier3 = TestModel.TEST_PATH;
        ContainerNode containerNode3 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        YangInstanceIdentifier yangInstanceIdentifier4 = TestModel.INNER_LIST_PATH;
        ContainerNode containerNode4 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
        YangInstanceIdentifier yangInstanceIdentifier5 = TestModel.OUTER_LIST_PATH;
        ((ActorUtils) Mockito.doReturn(readDataReply(containerNode2)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(yangInstanceIdentifier2), (Timeout) ArgumentMatchers.any(Timeout.class));
        ((ActorUtils) Mockito.doReturn(readDataReply(containerNode4)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(yangInstanceIdentifier4), (Timeout) ArgumentMatchers.any(Timeout.class));
        ((ActorUtils) Mockito.doReturn(dataExistsReply(true)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqDataExists(), (Timeout) ArgumentMatchers.any(Timeout.class));
        TransactionProxy transactionProxy = new TransactionProxy(this.mockComponentFactory, TransactionType.READ_WRITE);
        transactionProxy.write(yangInstanceIdentifier, containerNode);
        transactionProxy.write(yangInstanceIdentifier2, containerNode2);
        Assert.assertEquals("Response NormalizedNode", Optional.of(containerNode2), (Optional) transactionProxy.read(yangInstanceIdentifier2).get(5L, TimeUnit.SECONDS));
        transactionProxy.merge(yangInstanceIdentifier3, containerNode3);
        transactionProxy.merge(yangInstanceIdentifier4, containerNode4);
        Optional optional = (Optional) transactionProxy.read(yangInstanceIdentifier4).get(5L, TimeUnit.SECONDS);
        transactionProxy.delete(yangInstanceIdentifier5);
        Assert.assertEquals("Exists response", Boolean.TRUE, (Boolean) transactionProxy.exists(TestModel.TEST_PATH).get());
        Assert.assertEquals("Response NormalizedNode", Optional.of(containerNode4), optional);
        List<BatchedModifications> captureBatchedModifications = captureBatchedModifications(actorRef);
        Assert.assertEquals("Captured BatchedModifications count", 3L, captureBatchedModifications.size());
        verifyBatchedModifications(captureBatchedModifications.get(0), false, new WriteModification(yangInstanceIdentifier, containerNode), new WriteModification(yangInstanceIdentifier2, containerNode2));
        verifyBatchedModifications(captureBatchedModifications.get(1), false, new MergeModification(yangInstanceIdentifier3, containerNode3), new MergeModification(yangInstanceIdentifier4, containerNode4));
        verifyBatchedModifications(captureBatchedModifications.get(2), false, new DeleteModification(yangInstanceIdentifier5));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.mockActorContext});
        ((ActorUtils) inOrder.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), ArgumentMatchers.isA(BatchedModifications.class), (Timeout) ArgumentMatchers.any(Timeout.class));
        ((ActorUtils) inOrder.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(yangInstanceIdentifier2), (Timeout) ArgumentMatchers.any(Timeout.class));
        ((ActorUtils) inOrder.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), ArgumentMatchers.isA(BatchedModifications.class), (Timeout) ArgumentMatchers.any(Timeout.class));
        ((ActorUtils) inOrder.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqReadData(yangInstanceIdentifier4), (Timeout) ArgumentMatchers.any(Timeout.class));
        ((ActorUtils) inOrder.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), ArgumentMatchers.isA(BatchedModifications.class), (Timeout) ArgumentMatchers.any(Timeout.class));
        ((ActorUtils) inOrder.verify(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorRef)), eqDataExists(), (Timeout) ArgumentMatchers.any(Timeout.class));
    }

    @Test
    public void testReadRoot() throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
        EffectiveModelContext full = SchemaContextHelper.full();
        Configuration configuration = (Configuration) Mockito.mock(Configuration.class);
        ((ActorUtils) Mockito.doReturn(configuration).when(this.mockActorContext)).getConfiguration();
        ((ActorUtils) Mockito.doReturn(full).when(this.mockActorContext)).getSchemaContext();
        ((Configuration) Mockito.doReturn(Sets.newHashSet(new String[]{"test", "cars"})).when(configuration)).getAllShardNames();
        ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
        ContainerNode containerNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
        setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(containerNode, full));
        setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(containerNode2, full));
        ((ActorUtils) Mockito.doReturn(MemberName.forName("mock-member")).when(this.mockActorContext)).getCurrentMemberName();
        ((ActorUtils) Mockito.doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(this.mockActorContext)).getClientDispatcher();
        Optional optional = (Optional) new TransactionProxy(this.mockComponentFactory, TransactionType.READ_ONLY).read(YangInstanceIdentifier.empty()).get(5L, TimeUnit.SECONDS);
        Assert.assertTrue("NormalizedNode isPresent", optional.isPresent());
        NormalizedNode normalizedNode = (NormalizedNode) optional.orElseThrow();
        Assert.assertTrue("Expect value to be a Collection", normalizedNode.body() instanceof Collection);
        Collection<NormalizedNode> collection = (Collection) normalizedNode.body();
        for (NormalizedNode normalizedNode2 : collection) {
            Assert.assertTrue("Expected " + normalizedNode2 + " to be a ContainerNode", normalizedNode2 instanceof ContainerNode);
        }
        Assert.assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found", NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
        Assert.assertEquals(containerNode, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
        Assert.assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found", NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
        Assert.assertEquals(containerNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
    }

    private void setUpReadData(String str, NormalizedNode normalizedNode) {
        ActorSystem system = getSystem();
        ActorRef actorOf = getSystem().actorOf(Props.create(DoNothingActor.class, new Object[0]));
        ((ActorUtils) Mockito.doReturn(getSystem().actorSelection(actorOf.path())).when(this.mockActorContext)).actorSelection(actorOf.path().toString());
        ((ActorUtils) Mockito.doReturn(primaryShardInfoReply(getSystem(), actorOf)).when(this.mockActorContext)).findPrimaryShardAsync((String) ArgumentMatchers.eq(str));
        ActorRef actorOf2 = system.actorOf(Props.create(DoNothingActor.class, new Object[0]));
        ((ActorUtils) Mockito.doReturn(system.actorSelection(actorOf2.path())).when(this.mockActorContext)).actorSelection(actorOf2.path().toString());
        ((ActorUtils) Mockito.doReturn(Futures.successful(createTransactionReply(actorOf2, (short) 12))).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(system.actorSelection(actorOf.path())), eqCreateTransaction("mock-member", TransactionType.READ_ONLY), (Timeout) ArgumentMatchers.any(Timeout.class));
        ((ActorUtils) Mockito.doReturn(readDataReply(normalizedNode)).when(this.mockActorContext)).executeOperationAsync((ActorSelection) ArgumentMatchers.eq(actorSelection(actorOf2)), eqReadData(YangInstanceIdentifier.empty()), (Timeout) ArgumentMatchers.any(Timeout.class));
    }
}
