package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.socks5.auth.DefaultPasswordAuthImpl;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.class */
public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        ServiceConfiguration defaultConf = getDefaultConf();
        defaultConf.setTransactionCoordinatorEnabled(true);
        super.baseSetup(defaultConf);
        this.admin.tenants().createTenant(DefaultPasswordAuthImpl.DEFAULT_PASSWORD, new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
        createTransactionCoordinatorAssign(16);
        this.admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testAddAndRemoveTransactionMetadataStore() throws Exception {
        TransactionMetadataStoreService transactionMetadataStoreService = this.pulsar.getTransactionMetadataStoreService();
        Assert.assertNotNull(transactionMetadataStoreService);
        this.admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString());
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(transactionMetadataStoreService.getStores().size() == 1);
        });
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(transactionMetadataStoreService.getStores().size() == 0);
        });
    }

    @Test
    public void testNewTransaction() throws Exception {
        TransactionMetadataStoreService transactionMetadataStoreService = this.pulsar.getTransactionMetadataStoreService();
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0L));
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(1L));
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(2L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(transactionMetadataStoreService.getStores().size() == 3);
        });
        checkTransactionMetadataStoreReady((MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L)));
        checkTransactionMetadataStoreReady((MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(1L)));
        checkTransactionMetadataStoreReady((MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(2L)));
        TxnID txnID = (TxnID) transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0L), 5L, (String) null).get();
        TxnID txnID2 = (TxnID) transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1L), 5L, (String) null).get();
        TxnID txnID3 = (TxnID) transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2L), 5L, (String) null).get();
        Assert.assertEquals(txnID.getMostSigBits(), 0L);
        Assert.assertEquals(txnID2.getMostSigBits(), 1L);
        Assert.assertEquals(txnID3.getMostSigBits(), 2L);
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0L));
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(1L));
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(2L));
        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0);
    }

    @Test
    public void testAddProducedPartitionToTxn() throws Exception {
        TransactionMetadataStoreService transactionMetadataStoreService = this.pulsar.getTransactionMetadataStoreService();
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(transactionMetadataStoreService.getStores().size() == 1);
        });
        checkTransactionMetadataStoreReady((MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L)));
        TxnID txnID = (TxnID) transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0L), 5000L, (String) null).get();
        ArrayList arrayList = new ArrayList();
        arrayList.add("ptn-0");
        arrayList.add("ptn-1");
        arrayList.add("ptn-2");
        transactionMetadataStoreService.addProducedPartitionToTxn(txnID, arrayList);
        Assert.assertEquals(((TxnMeta) transactionMetadataStoreService.getTxnMeta(txnID).get()).status(), TxnStatus.OPEN);
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0L));
        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0);
    }

    @Test
    public void testAddAckedPartitionToTxn() throws Exception {
        TransactionMetadataStoreService transactionMetadataStoreService = this.pulsar.getTransactionMetadataStoreService();
        transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0L)).get();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(transactionMetadataStoreService.getStores().size() == 1);
        });
        checkTransactionMetadataStoreReady((MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L)));
        TxnID txnID = (TxnID) transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0L), 5000L, (String) null).get();
        ArrayList arrayList = new ArrayList();
        arrayList.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
        arrayList.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build());
        arrayList.add(TransactionSubscription.builder().topic("ptn-3").subscription("sub-1").build());
        transactionMetadataStoreService.addAckedPartitionToTxn(txnID, arrayList);
        Assert.assertEquals(((TxnMeta) transactionMetadataStoreService.getTxnMeta(txnID).get()).status(), TxnStatus.OPEN);
        transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0L));
        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0);
    }

    @Test
    public void testTimeoutTracker() throws Exception {
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L)) != null);
        });
        MLTransactionMetadataStore mLTransactionMetadataStore = (MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L));
        checkTransactionMetadataStoreReady(mLTransactionMetadataStore);
        Field declaredField = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
        declaredField.setAccessible(true);
        ConcurrentSkipListMap concurrentSkipListMap = (ConcurrentSkipListMap) declaredField.get(mLTransactionMetadataStore);
        int i = -1;
        while (true) {
            i++;
            if (i >= 1000) {
                concurrentSkipListMap.forEach((l, pair) -> {
                    Assert.assertEquals(((TxnMeta) pair.getLeft()).status(), TxnStatus.OPEN);
                });
                Awaitility.await().atLeast(1000L, TimeUnit.MICROSECONDS).until(() -> {
                    return Boolean.valueOf(concurrentSkipListMap.size() == 0);
                });
                return;
            }
            try {
                newTransactionWithTimeoutOf(2000L);
            } catch (Exception e) {
            }
        }
    }

    private TxnID newTransactionWithTimeoutOf(long j) throws InterruptedException, ExecutionException {
        return (TxnID) ((MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L))).newTransaction(j, (String) null).get();
    }

    @Test
    public void testTimeoutTrackerExpired() throws Exception {
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L)) != null);
        });
        MLTransactionMetadataStore mLTransactionMetadataStore = (MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L));
        checkTransactionMetadataStoreReady(mLTransactionMetadataStore);
        Field declaredField = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
        declaredField.setAccessible(true);
        ConcurrentSkipListMap concurrentSkipListMap = (ConcurrentSkipListMap) declaredField.get(mLTransactionMetadataStore);
        newTransactionWithTimeoutOf(2000L);
        Assert.assertEquals(concurrentSkipListMap.size(), 1);
        concurrentSkipListMap.forEach((l, pair) -> {
            Assert.assertEquals(((TxnMeta) pair.getLeft()).status(), TxnStatus.OPEN);
        });
        Awaitility.await().atLeast(1000L, TimeUnit.MICROSECONDS).until(() -> {
            return Boolean.valueOf(concurrentSkipListMap.size() == 0);
        });
        newTransactionWithTimeoutOf(2000L);
        Assert.assertEquals(concurrentSkipListMap.size(), 1);
        concurrentSkipListMap.forEach((l2, pair2) -> {
            Assert.assertEquals(((TxnMeta) pair2.getLeft()).status(), TxnStatus.OPEN);
        });
        Awaitility.await().atLeast(1000L, TimeUnit.MICROSECONDS).until(() -> {
            return Boolean.valueOf(concurrentSkipListMap.size() == 0);
        });
    }

    @Test
    public void testTimeoutTrackerMultiThreading() throws Exception {
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L)) != null);
        });
        MLTransactionMetadataStore mLTransactionMetadataStore = (MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L));
        checkTransactionMetadataStoreReady(mLTransactionMetadataStore);
        Field declaredField = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
        declaredField.setAccessible(true);
        ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> concurrentSkipListMap = (ConcurrentSkipListMap) declaredField.get(mLTransactionMetadataStore);
        new Thread(() -> {
            int i = -1;
            while (true) {
                i++;
                if (i >= 100) {
                    return;
                } else {
                    try {
                        newTransactionWithTimeoutOf(1000L);
                    } catch (Exception e) {
                    }
                }
            }
        }).start();
        new Thread(() -> {
            int i = -1;
            while (true) {
                i++;
                if (i >= 100) {
                    return;
                } else {
                    try {
                        newTransactionWithTimeoutOf(2000L);
                    } catch (Exception e) {
                    }
                }
            }
        }).start();
        new Thread(() -> {
            int i = -1;
            while (true) {
                i++;
                if (i >= 100) {
                    return;
                } else {
                    try {
                        newTransactionWithTimeoutOf(3000L);
                    } catch (Exception e) {
                    }
                }
            }
        }).start();
        new Thread(() -> {
            int i = -1;
            while (true) {
                i++;
                if (i >= 100) {
                    return;
                } else {
                    try {
                        newTransactionWithTimeoutOf(4000L);
                    } catch (Exception e) {
                    }
                }
            }
        }).start();
        checkoutTimeout(concurrentSkipListMap, 300);
        checkoutTimeout(concurrentSkipListMap, 200);
        checkoutTimeout(concurrentSkipListMap, 100);
        checkoutTimeout(concurrentSkipListMap, 0);
    }

    private void checkoutTimeout(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> concurrentSkipListMap, int i) {
        Awaitility.await().atLeast(1000L, TimeUnit.MICROSECONDS).until(() -> {
            return Boolean.valueOf(concurrentSkipListMap.size() == i);
        });
    }

    @Test
    public void transactionTimeoutRecoverTest() throws Exception {
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L)) != null);
        });
        checkTransactionMetadataStoreReady((MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L)));
        newTransactionWithTimeoutOf(2000L);
        this.pulsar.getTransactionMetadataStoreService().removeTransactionMetadataStore(TransactionCoordinatorID.get(0L));
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L)) != null);
        });
        MLTransactionMetadataStore mLTransactionMetadataStore = (MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L));
        checkTransactionMetadataStoreReady(mLTransactionMetadataStore);
        Field declaredField = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
        declaredField.setAccessible(true);
        ConcurrentSkipListMap concurrentSkipListMap = (ConcurrentSkipListMap) declaredField.get(mLTransactionMetadataStore);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(concurrentSkipListMap.size() == 0);
        });
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "txnStatus")
    public Object[][] txnStatus() {
        return new Object[]{new Object[]{TxnStatus.OPEN}, new Object[]{TxnStatus.ABORTING}, new Object[]{TxnStatus.COMMITTING}};
    }

    @Test(dataProvider = "txnStatus")
    public void testEndTransactionOpRetry(TxnStatus txnStatus) throws Exception {
        this.pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0L));
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L)) != null);
        });
        MLTransactionMetadataStore mLTransactionMetadataStore = (MLTransactionMetadataStore) this.pulsar.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0L));
        checkTransactionMetadataStoreReady(mLTransactionMetadataStore);
        TxnID newTransactionWithTimeoutOf = newTransactionWithTimeoutOf(3000 - 2000);
        TxnMeta txnMeta = (TxnMeta) mLTransactionMetadataStore.getTxnMeta(newTransactionWithTimeoutOf).get();
        txnMeta.updateTxnStatus(txnStatus, TxnStatus.OPEN);
        Field declaredField = TransactionMetadataStoreState.class.getDeclaredField("state");
        declaredField.setAccessible(true);
        declaredField.set(mLTransactionMetadataStore, TransactionMetadataStoreState.State.None);
        try {
            this.pulsar.getTransactionMetadataStoreService().endTransaction(newTransactionWithTimeoutOf, TxnAction.COMMIT.getValue(), false).get(5L, TimeUnit.SECONDS);
            Assert.fail();
        } catch (Exception e) {
            if (txnStatus == TxnStatus.OPEN || txnStatus == TxnStatus.COMMITTING) {
                Assert.assertTrue(e instanceof TimeoutException);
            } else if (txnStatus == TxnStatus.ABORTING) {
                Assert.assertTrue(e.getCause() instanceof CoordinatorException.InvalidTxnStatusException);
            } else {
                Assert.fail();
            }
        }
        Assert.assertEquals(txnMeta.status(), txnStatus);
        Field declaredField2 = TransactionMetadataStoreState.class.getDeclaredField("state");
        declaredField2.setAccessible(true);
        declaredField2.set(mLTransactionMetadataStore, TransactionMetadataStoreState.State.Ready);
        if (txnStatus == TxnStatus.ABORTING) {
            this.pulsar.getTransactionMetadataStoreService().endTransaction(newTransactionWithTimeoutOf, TxnAction.ABORT.getValue(), false).get();
        }
        Awaitility.await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> {
            try {
                mLTransactionMetadataStore.getTxnMeta(newTransactionWithTimeoutOf).get();
                return false;
            } catch (ExecutionException e2) {
                return Boolean.valueOf(e2.getCause() instanceof CoordinatorException.TransactionNotFoundException);
            }
        });
    }

    private void checkTransactionMetadataStoreReady(MLTransactionMetadataStore mLTransactionMetadataStore) throws NoSuchMethodException {
        Method declaredMethod = TransactionMetadataStoreState.class.getDeclaredMethod("checkIfReady", new Class[0]);
        declaredMethod.setAccessible(true);
        Awaitility.await().until(() -> {
            return (Boolean) declaredMethod.invoke(mLTransactionMetadataStore, new Object[0]);
        });
    }
}
