package org.opendaylight.controller.cluster.datastore;

import akka.actor.ActorSystem;
import akka.actor.AddressFromURIString;
import akka.cluster.Cluster;
import akka.testkit.javadsl.TestKit;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.class */
public class DistributedDataStoreIntegrationTest extends AbstractDistributedDataStoreIntegrationTest {
    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{TestClientBackedDataStore.class});
    }

    @Before
    public void setUp() {
        InMemorySnapshotStore.clear();
        InMemoryJournal.clear();
        this.system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
        Cluster.get(this.system).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
    }

    @After
    public void tearDown() {
        TestKit.shutdownActorSystem(this.system, true);
        this.system = null;
    }

    private void testTransactionWritesWithShardNotInitiallyReady(String str, boolean z) throws Exception {
        IntegrationTestKit integrationTestKit = new IntegrationTestKit(getSystem(), this.datastoreContextBuilder);
        String format = String.format("member-1-shard-%s-%s", "test-1", str);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        InMemoryJournal.addBlockReadMessagesLatch(format, countDownLatch);
        AbstractDataStore abstractDataStore = integrationTestKit.setupAbstractDataStore(this.testParameter, str, false, "test-1");
        try {
            DOMStoreWriteTransaction newWriteOnlyTransaction = z ? abstractDataStore.newWriteOnlyTransaction() : abstractDataStore.newReadWriteTransaction();
            Assert.assertNotNull("newReadWriteTransaction returned null", newWriteOnlyTransaction);
            YangInstanceIdentifier build = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
            AtomicReference atomicReference = new AtomicReference();
            AtomicReference atomicReference2 = new AtomicReference();
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            new Thread(() -> {
                try {
                    try {
                        newWriteOnlyTransaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
                        newWriteOnlyTransaction.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42)).build());
                        newWriteOnlyTransaction.write(build, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
                        newWriteOnlyTransaction.delete(build);
                        atomicReference.set(newWriteOnlyTransaction.ready());
                        countDownLatch2.countDown();
                    } catch (Exception e) {
                        atomicReference2.set(e);
                        countDownLatch2.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch2.countDown();
                    throw th;
                }
            }).start();
            boolean awaitUninterruptibly = Uninterruptibles.awaitUninterruptibly(countDownLatch2, 5L, TimeUnit.SECONDS);
            if (atomicReference2.get() != null) {
                throw ((Exception) atomicReference2.get());
            }
            Assert.assertTrue("Tx ready", awaitUninterruptibly);
            countDownLatch.countDown();
            integrationTestKit.doCommit((DOMStoreThreePhaseCommitCohort) atomicReference.get());
            DOMStoreReadTransaction newReadOnlyTransaction = abstractDataStore.newReadOnlyTransaction();
            Assert.assertTrue("isPresent", ((Optional) newReadOnlyTransaction.read(TestModel.TEST_PATH).get(5L, TimeUnit.SECONDS)).isPresent());
            Assert.assertTrue("isPresent", ((Optional) newReadOnlyTransaction.read(TestModel.OUTER_LIST_PATH).get(5L, TimeUnit.SECONDS)).isPresent());
            Assert.assertFalse("isPresent", ((Optional) newReadOnlyTransaction.read(build).get(5L, TimeUnit.SECONDS)).isPresent());
            if (abstractDataStore != null) {
                abstractDataStore.close();
            }
        } catch (Throwable th) {
            if (abstractDataStore != null) {
                try {
                    abstractDataStore.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
        this.datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
        testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
    }

    @Test
    public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
        testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
    }

    @Test
    public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
        IntegrationTestKit integrationTestKit = new IntegrationTestKit(getSystem(), this.datastoreContextBuilder);
        String format = String.format("member-1-shard-%s-%s", "test-1", "testTransactionReadsWithShardNotInitiallyReady");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        InMemoryJournal.addBlockReadMessagesLatch(format, countDownLatch);
        AbstractDataStore abstractDataStore = integrationTestKit.setupAbstractDataStore(this.testParameter, "testTransactionReadsWithShardNotInitiallyReady", false, "test-1");
        try {
            DOMStoreReadWriteTransaction newReadWriteTransaction = abstractDataStore.newReadWriteTransaction();
            Assert.assertNotNull("newReadWriteTransaction returned null", newReadWriteTransaction);
            AtomicReference atomicReference = new AtomicReference();
            AtomicReference atomicReference2 = new AtomicReference();
            AtomicReference atomicReference3 = new AtomicReference();
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            new Thread(() -> {
                try {
                    try {
                        newReadWriteTransaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
                        atomicReference.set(newReadWriteTransaction.exists(TestModel.TEST_PATH));
                        atomicReference2.set(newReadWriteTransaction.read(TestModel.TEST_PATH));
                        countDownLatch2.countDown();
                    } catch (Exception e) {
                        atomicReference3.set(e);
                        countDownLatch2.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch2.countDown();
                    throw th;
                }
            }).start();
            boolean awaitUninterruptibly = Uninterruptibles.awaitUninterruptibly(countDownLatch2, 5L, TimeUnit.SECONDS);
            if (atomicReference3.get() != null) {
                throw ((Exception) atomicReference3.get());
            }
            Assert.assertTrue("Tx reads done", awaitUninterruptibly);
            countDownLatch.countDown();
            Assert.assertEquals("exists", Boolean.TRUE, ((FluentFuture) atomicReference.get()).get(5L, TimeUnit.SECONDS));
            Assert.assertTrue("read", ((Optional) ((FluentFuture) atomicReference2.get()).get(5L, TimeUnit.SECONDS)).isPresent());
            newReadWriteTransaction.close();
            if (abstractDataStore != null) {
                abstractDataStore.close();
            }
        } catch (Throwable th) {
            if (abstractDataStore != null) {
                try {
                    abstractDataStore.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(expected = NotInitializedException.class)
    public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
        IntegrationTestKit integrationTestKit = new IntegrationTestKit(getSystem(), this.datastoreContextBuilder);
        this.datastoreContextBuilder.shardInitializationTimeout(300L, TimeUnit.MILLISECONDS);
        String format = String.format("member-1-shard-%s-%s", "test-1", "testTransactionCommitFailureWithShardNotInitialized");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        InMemoryJournal.addBlockReadMessagesLatch(format, countDownLatch);
        InMemoryJournal.addEntry(format, 1L, "Dummy data so akka will read from persistence");
        DOMStoreWriteTransaction newWriteOnlyTransaction = integrationTestKit.setupAbstractDataStore(this.testParameter, "testTransactionCommitFailureWithShardNotInitialized", false, "test-1").newWriteOnlyTransaction();
        Assert.assertNotNull("newReadWriteTransaction returned null", newWriteOnlyTransaction);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        new Thread(() -> {
            try {
                try {
                    newWriteOnlyTransaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
                    atomicReference.set(newWriteOnlyTransaction.ready());
                    countDownLatch2.countDown();
                } catch (Exception e) {
                    atomicReference2.set(e);
                    countDownLatch2.countDown();
                }
            } catch (Throwable th) {
                countDownLatch2.countDown();
                throw th;
            }
        }).start();
        boolean awaitUninterruptibly = Uninterruptibles.awaitUninterruptibly(countDownLatch2, 5L, TimeUnit.SECONDS);
        if (atomicReference2.get() != null) {
            throw ((Exception) atomicReference2.get());
        }
        Assert.assertTrue("Tx ready", awaitUninterruptibly);
        try {
            try {
                ((DOMStoreThreePhaseCommitCohort) atomicReference.get()).canCommit().get(5L, TimeUnit.SECONDS);
                Assert.fail("Expected NotInitializedException");
                countDownLatch.countDown();
            } catch (Exception e) {
                Throwable rootCause = Throwables.getRootCause(e);
                Throwables.throwIfUnchecked(rootCause);
                throw new RuntimeException(rootCause);
            }
        } catch (Throwable th) {
            countDownLatch.countDown();
            throw th;
        }
    }

    @Test(expected = NotInitializedException.class)
    public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
        IntegrationTestKit integrationTestKit = new IntegrationTestKit(getSystem(), this.datastoreContextBuilder);
        this.datastoreContextBuilder.shardInitializationTimeout(300L, TimeUnit.MILLISECONDS);
        String format = String.format("member-1-shard-%s-%s", "test-1", "testTransactionReadFailureWithShardNotInitialized");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        InMemoryJournal.addBlockReadMessagesLatch(format, countDownLatch);
        InMemoryJournal.addEntry(format, 1L, "Dummy data so akka will read from persistence");
        AbstractDataStore abstractDataStore = integrationTestKit.setupAbstractDataStore(this.testParameter, "testTransactionReadFailureWithShardNotInitialized", false, "test-1");
        try {
            DOMStoreReadWriteTransaction newReadWriteTransaction = abstractDataStore.newReadWriteTransaction();
            Assert.assertNotNull("newReadWriteTransaction returned null", newReadWriteTransaction);
            AtomicReference atomicReference = new AtomicReference();
            AtomicReference atomicReference2 = new AtomicReference();
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            new Thread(() -> {
                try {
                    try {
                        newReadWriteTransaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
                        atomicReference.set(newReadWriteTransaction.read(TestModel.TEST_PATH));
                        newReadWriteTransaction.close();
                        countDownLatch2.countDown();
                    } catch (Exception e) {
                        atomicReference2.set(e);
                        countDownLatch2.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch2.countDown();
                    throw th;
                }
            }).start();
            boolean awaitUninterruptibly = Uninterruptibles.awaitUninterruptibly(countDownLatch2, 5L, TimeUnit.SECONDS);
            if (atomicReference2.get() != null) {
                throw ((Exception) atomicReference2.get());
            }
            Assert.assertTrue("Tx read done", awaitUninterruptibly);
            try {
                try {
                    ((FluentFuture) atomicReference.get()).get(5L, TimeUnit.SECONDS);
                    countDownLatch.countDown();
                    if (abstractDataStore != null) {
                        abstractDataStore.close();
                    }
                } catch (ExecutionException e) {
                    Assert.assertTrue("Expected ReadFailedException cause: " + e.getCause(), e.getCause() instanceof ReadFailedException);
                    Throwable rootCause = Throwables.getRootCause(e);
                    Throwables.throwIfUnchecked(rootCause);
                    throw new RuntimeException(rootCause);
                }
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        } catch (Throwable th2) {
            if (abstractDataStore != null) {
                try {
                    abstractDataStore.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }
}
