/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.controller.cluster.datastore;

import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
import akka.cluster.Cluster;
import akka.testkit.javadsl.TestKit;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.AbstractDistributedDataStoreIntegrationTest;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;

@RunWith(value=Parameterized.class)
public class DistributedDataStoreIntegrationTest
extends AbstractDistributedDataStoreIntegrationTest {
    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[][]{{TestClientBackedDataStore.class}});
    }

    @Before
    public void setUp() {
        InMemorySnapshotStore.clear();
        InMemoryJournal.clear();
        this.system = ActorSystem.create((String)"cluster-test", (Config)ConfigFactory.load().getConfig("Member1"));
        Address member1Address = AddressFromURIString.parse((String)"akka://cluster-test@127.0.0.1:2558");
        Cluster.get((ActorSystem)this.system).join(member1Address);
    }

    @After
    public void tearDown() {
        TestKit.shutdownActorSystem((ActorSystem)this.system, (boolean)true);
        this.system = null;
    }

    private void testTransactionWritesWithShardNotInitiallyReady(String testName, boolean writeOnly) throws Exception {
        IntegrationTestKit testKit = new IntegrationTestKit(this.getSystem(), this.datastoreContextBuilder);
        String shardName = "test-1";
        String persistentID = String.format("member-1-shard-%s-%s", "test-1", testName);
        CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
        InMemoryJournal.addBlockReadMessagesLatch((String)persistentID, (CountDownLatch)blockRecoveryLatch);
        try (ClientBackedDataStore dataStore = testKit.setupDataStore((Class<? extends ClientBackedDataStore>)this.testParameter, testName, false, "test-1");){
            DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : dataStore.newReadWriteTransaction();
            Assert.assertNotNull((String)"newReadWriteTransaction returned null", (Object)writeTx);
            YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder((YangInstanceIdentifier)TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, (Object)1).build();
            AtomicReference txCohort = new AtomicReference();
            AtomicReference caughtEx = new AtomicReference();
            CountDownLatch txReady = new CountDownLatch(1);
            Thread txThread = new Thread(() -> {
                try {
                    writeTx.write(TestModel.TEST_PATH, (NormalizedNode)ImmutableNodes.containerNode((QName)TestModel.TEST_QNAME));
                    writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder((QName)TestModel.OUTER_LIST_QNAME).withChild((NormalizedNode)ImmutableNodes.mapEntry((QName)TestModel.OUTER_LIST_QNAME, (QName)TestModel.ID_QNAME, (Object)42)).build());
                    writeTx.write(listEntryPath, (NormalizedNode)ImmutableNodes.mapEntry((QName)TestModel.OUTER_LIST_QNAME, (QName)TestModel.ID_QNAME, (Object)1));
                    writeTx.delete(listEntryPath);
                    txCohort.set(writeTx.ready());
                }
                catch (Exception e) {
                    caughtEx.set(e);
                }
                finally {
                    txReady.countDown();
                }
            });
            txThread.start();
            boolean done = Uninterruptibles.awaitUninterruptibly((CountDownLatch)txReady, (long)5L, (TimeUnit)TimeUnit.SECONDS);
            if (caughtEx.get() != null) {
                throw (Exception)caughtEx.get();
            }
            Assert.assertTrue((String)"Tx ready", (boolean)done);
            blockRecoveryLatch.countDown();
            testKit.doCommit((DOMStoreThreePhaseCommitCohort)txCohort.get());
            DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
            Optional optional = (Optional)readTx.read(TestModel.TEST_PATH).get(5L, TimeUnit.SECONDS);
            Assert.assertTrue((String)"isPresent", (boolean)optional.isPresent());
            optional = (Optional)readTx.read(TestModel.OUTER_LIST_PATH).get(5L, TimeUnit.SECONDS);
            Assert.assertTrue((String)"isPresent", (boolean)optional.isPresent());
            optional = (Optional)readTx.read(listEntryPath).get(5L, TimeUnit.SECONDS);
            Assert.assertFalse((String)"isPresent", (boolean)optional.isPresent());
        }
    }

    @Test
    public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
        this.datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
        this.testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
    }

    @Test
    public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
        this.testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
    }

    @Test
    public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
        IntegrationTestKit testKit = new IntegrationTestKit(this.getSystem(), this.datastoreContextBuilder);
        String testName = "testTransactionReadsWithShardNotInitiallyReady";
        String shardName = "test-1";
        String persistentID = String.format("member-1-shard-%s-%s", "test-1", "testTransactionReadsWithShardNotInitiallyReady");
        CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
        InMemoryJournal.addBlockReadMessagesLatch((String)persistentID, (CountDownLatch)blockRecoveryLatch);
        try (ClientBackedDataStore dataStore = testKit.setupDataStore((Class<? extends ClientBackedDataStore>)this.testParameter, "testTransactionReadsWithShardNotInitiallyReady", false, "test-1");){
            DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
            Assert.assertNotNull((String)"newReadWriteTransaction returned null", (Object)readWriteTx);
            AtomicReference txExistsFuture = new AtomicReference();
            AtomicReference txReadFuture = new AtomicReference();
            AtomicReference caughtEx = new AtomicReference();
            CountDownLatch txReadsDone = new CountDownLatch(1);
            Thread txThread = new Thread(() -> {
                try {
                    readWriteTx.write(TestModel.TEST_PATH, (NormalizedNode)ImmutableNodes.containerNode((QName)TestModel.TEST_QNAME));
                    txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
                    txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
                }
                catch (Exception e) {
                    caughtEx.set(e);
                }
                finally {
                    txReadsDone.countDown();
                }
            });
            txThread.start();
            boolean done = Uninterruptibles.awaitUninterruptibly((CountDownLatch)txReadsDone, (long)5L, (TimeUnit)TimeUnit.SECONDS);
            if (caughtEx.get() != null) {
                throw (Exception)caughtEx.get();
            }
            Assert.assertTrue((String)"Tx reads done", (boolean)done);
            blockRecoveryLatch.countDown();
            Assert.assertEquals((String)"exists", (Object)Boolean.TRUE, (Object)((FluentFuture)txExistsFuture.get()).get(5L, TimeUnit.SECONDS));
            Assert.assertTrue((String)"read", (boolean)((Optional)((FluentFuture)txReadFuture.get()).get(5L, TimeUnit.SECONDS)).isPresent());
            readWriteTx.close();
        }
    }

    @Test(expected=NotInitializedException.class)
    public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
        IntegrationTestKit testKit = new IntegrationTestKit(this.getSystem(), this.datastoreContextBuilder);
        String testName = "testTransactionCommitFailureWithShardNotInitialized";
        String shardName = "test-1";
        this.datastoreContextBuilder.shardInitializationTimeout(300L, TimeUnit.MILLISECONDS);
        String persistentID = String.format("member-1-shard-%s-%s", "test-1", "testTransactionCommitFailureWithShardNotInitialized");
        CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
        InMemoryJournal.addBlockReadMessagesLatch((String)persistentID, (CountDownLatch)blockRecoveryLatch);
        InMemoryJournal.addEntry((String)persistentID, (long)1L, (Object)"Dummy data so akka will read from persistence");
        ClientBackedDataStore dataStore = testKit.setupDataStore((Class<? extends ClientBackedDataStore>)this.testParameter, "testTransactionCommitFailureWithShardNotInitialized", false, "test-1");
        DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
        Assert.assertNotNull((String)"newReadWriteTransaction returned null", (Object)writeTx);
        AtomicReference txCohort = new AtomicReference();
        AtomicReference caughtEx = new AtomicReference();
        CountDownLatch txReady = new CountDownLatch(1);
        Thread txThread = new Thread(() -> {
            try {
                writeTx.write(TestModel.TEST_PATH, (NormalizedNode)ImmutableNodes.containerNode((QName)TestModel.TEST_QNAME));
                txCohort.set(writeTx.ready());
            }
            catch (Exception e) {
                caughtEx.set(e);
            }
            finally {
                txReady.countDown();
            }
        });
        txThread.start();
        boolean done = Uninterruptibles.awaitUninterruptibly((CountDownLatch)txReady, (long)5L, (TimeUnit)TimeUnit.SECONDS);
        if (caughtEx.get() != null) {
            throw (Exception)caughtEx.get();
        }
        Assert.assertTrue((String)"Tx ready", (boolean)done);
        try {
            ((DOMStoreThreePhaseCommitCohort)txCohort.get()).canCommit().get(5L, TimeUnit.SECONDS);
            Assert.fail((String)"Expected NotInitializedException");
        }
        catch (Exception e) {
            Throwable root = Throwables.getRootCause((Throwable)e);
            Throwables.throwIfUnchecked((Throwable)root);
            throw new RuntimeException(root);
        }
        finally {
            blockRecoveryLatch.countDown();
        }
    }

    @Test(expected=NotInitializedException.class)
    public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
        IntegrationTestKit testKit = new IntegrationTestKit(this.getSystem(), this.datastoreContextBuilder);
        String testName = "testTransactionReadFailureWithShardNotInitialized";
        String shardName = "test-1";
        this.datastoreContextBuilder.shardInitializationTimeout(300L, TimeUnit.MILLISECONDS);
        String persistentID = String.format("member-1-shard-%s-%s", "test-1", "testTransactionReadFailureWithShardNotInitialized");
        CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
        InMemoryJournal.addBlockReadMessagesLatch((String)persistentID, (CountDownLatch)blockRecoveryLatch);
        InMemoryJournal.addEntry((String)persistentID, (long)1L, (Object)"Dummy data so akka will read from persistence");
        try (ClientBackedDataStore dataStore = testKit.setupDataStore((Class<? extends ClientBackedDataStore>)this.testParameter, "testTransactionReadFailureWithShardNotInitialized", false, "test-1");){
            DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
            Assert.assertNotNull((String)"newReadWriteTransaction returned null", (Object)readWriteTx);
            AtomicReference txReadFuture = new AtomicReference();
            AtomicReference caughtEx = new AtomicReference();
            CountDownLatch txReadDone = new CountDownLatch(1);
            Thread txThread = new Thread(() -> {
                try {
                    readWriteTx.write(TestModel.TEST_PATH, (NormalizedNode)ImmutableNodes.containerNode((QName)TestModel.TEST_QNAME));
                    txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
                    readWriteTx.close();
                }
                catch (Exception e) {
                    caughtEx.set(e);
                }
                finally {
                    txReadDone.countDown();
                }
            });
            txThread.start();
            boolean done = Uninterruptibles.awaitUninterruptibly((CountDownLatch)txReadDone, (long)5L, (TimeUnit)TimeUnit.SECONDS);
            if (caughtEx.get() != null) {
                throw (Exception)caughtEx.get();
            }
            Assert.assertTrue((String)"Tx read done", (boolean)done);
            try {
                ((FluentFuture)txReadFuture.get()).get(5L, TimeUnit.SECONDS);
            }
            catch (ExecutionException e) {
                Assert.assertTrue((String)("Expected ReadFailedException cause: " + e.getCause()), (boolean)(e.getCause() instanceof ReadFailedException));
                Throwable root = Throwables.getRootCause((Throwable)e);
                Throwables.throwIfUnchecked((Throwable)root);
                throw new RuntimeException(root);
            }
            finally {
                blockRecoveryLatch.countDown();
            }
        }
    }
}

