package org.apache.pulsar.broker.transaction;

import io.netty.buffer.Unpooled;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.common.util.Bytes;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ClientCnxTest;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessagesImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/transaction/TransactionTest.class */
public class TransactionTest extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionTest.class);
    private static final int NUM_BROKERS = 1;
    private static final int NUM_PARTITIONS = 1;

    @BeforeClass
    protected void setup() throws Exception {
        setUpBase(1, 1, "tnx/ns1/test", 0);
    }

    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testTopicTransactionMetrics() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://tnx/ns1/test_transaction_topic").sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.MINUTES).build().get();
            create.newMessage(transaction).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)).send();
            transaction.commit().get();
            Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.MINUTES).build().get();
            create.newMessage(transaction2).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)).send();
            transaction2.abort().get();
            create.newMessage((Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.MINUTES).build().get()).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)).send();
            Optional optional = (Optional) this.pulsarServiceList.get(0).getBrokerService().getTopic("persistent://tnx/ns1/test_transaction_topic", false).get();
            Assert.assertTrue(optional.isPresent());
            TopicStatsImpl stats = ((PersistentTopic) optional.get()).getStats(false, false, false);
            Assert.assertEquals(stats.committedTxnCount, 1L);
            Assert.assertEquals(stats.abortedTxnCount, 1L);
            Assert.assertEquals(stats.ongoingTxnCount, 1L);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testCreateTransactionSystemTopic() throws Exception {
        Consumer<byte[]> consumer;
        String topicName = TopicName.get("tnx/ns1/testCreateTransactionSystemTopic").toString();
        this.admin.namespaces().deleteNamespace(ClientCnxTest.NAMESPACE, true);
        this.admin.namespaces().createNamespace(ClientCnxTest.NAMESPACE);
        try {
            consumer = getConsumer(topicName, "test");
            try {
                consumer.acknowledgeAsync(new MessageIdImpl(10L, 10L, 10), (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get()).get();
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            } finally {
            }
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
        }
        String transactionPendingAckStoreSuffix = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, "test");
        List list = this.admin.topics().getList(ClientCnxTest.NAMESPACE);
        Assert.assertFalse(list.isEmpty());
        list.forEach(str -> {
            Assert.assertFalse(str.contains("__transaction_pending_ack"));
        });
        try {
            consumer = getConsumer(transactionPendingAckStoreSuffix, "test");
            try {
                Assert.fail();
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            } finally {
            }
        } catch (PulsarClientException.NotAllowedException e2) {
            Assert.assertTrue(e2.getMessage().contains("Can not create transaction system topic"));
        }
        try {
            this.admin.topics().getSubscriptions(transactionPendingAckStoreSuffix);
            Assert.fail();
        } catch (PulsarAdminException e3) {
            Assert.assertEquals(e3.getMessage(), "Can not create transaction system topic " + transactionPendingAckStoreSuffix);
        }
        try {
            this.admin.topics().createPartitionedTopic(transactionPendingAckStoreSuffix, 3);
            Assert.fail();
        } catch (PulsarAdminException e4) {
            Assert.assertEquals(e4.getMessage(), "Cannot create topic in system topic format!");
        }
        try {
            this.admin.topics().createNonPartitionedTopic(transactionPendingAckStoreSuffix);
            Assert.fail();
        } catch (PulsarAdminException e5) {
            Assert.assertEquals(e5.getMessage(), "Cannot create topic in system topic format!");
        }
    }

    @Test
    public void testCanDeleteNamespaceWhenEnableTxnSegmentedSnapshot() throws Exception {
        this.pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
        this.pulsarServiceList.get(0).getConfig().setForceDeleteNamespaceAllowed(true);
        this.admin.namespaces().createNamespace("tnx/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic");
        Producer create = this.pulsarClient.newProducer().topic("persistent://" + "tnx/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic" + "/newTopic").create();
        try {
            create.close();
            this.admin.namespaces().deleteNamespace("tnx/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic", true);
            this.pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
        String topicName = TopicName.get("tnx/ns1/test").toString();
        Consumer<byte[]> consumer = getConsumer(topicName, "test");
        try {
            consumer.close();
            Awaitility.await().until(() -> {
                try {
                    this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                    return true;
                } catch (Exception e) {
                    return false;
                }
            });
            this.admin.namespaces().unload(NamespaceName.SYSTEM_NAMESPACE.toString());
            this.admin.namespaces().unload(ClientCnxTest.NAMESPACE);
            consumer = getConsumer(topicName, "test");
            try {
                Awaitility.await().until(() -> {
                    try {
                        this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
                        return true;
                    } catch (Exception e) {
                        return false;
                    }
                });
                ConcurrentOpenHashMap topics = getPulsarServiceList().get(0).getBrokerService().getTopics();
                Assert.assertNull(topics.get(TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_").toString() + "0"));
                Assert.assertNull(topics.get(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()));
                Assert.assertNull(topics.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, "test")));
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            } finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
            throw th;
        }
    }

    public Consumer<byte[]> getConsumer(String str, String str2) throws PulsarClientException {
        return this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).subscribe();
    }

    @Test
    public void testAsyncSendOrAckForSingleFuture() throws Exception {
        int i = 10;
        getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(30);
        Producer create = this.pulsarClient.newProducer().topic("tnx/ns1/testSingleFuture").producerName("producer").sendTimeout(0, TimeUnit.SECONDS).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/testSingleFuture"}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("subscription").subscribe();
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
        for (int i2 = 0; i2 < 10 * 30; i2++) {
            create.newMessage().send();
        }
        CountDownLatch countDownLatch = new CountDownLatch(30);
        for (int i3 = 0; i3 < 30; i3++) {
            newFixedThreadPool.submit(() -> {
                for (int i4 = 0; i4 < i; i4++) {
                    try {
                        copyOnWriteArrayList.add(create.newMessage(transaction).sendAsync());
                        copyOnWriteArrayList2.add(subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), transaction));
                    } catch (Exception e) {
                        log.error("Failed to send/ack messages with transaction.", e);
                        countDownLatch.countDown();
                        return;
                    }
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await(10L, TimeUnit.SECONDS);
        transaction.commit().get();
        Field declaredField = TransactionImpl.class.getDeclaredField("opCount");
        declaredField.setAccessible(true);
        Assert.assertEquals(((Long) declaredField.get(transaction)).longValue(), 0L);
        for (int i4 = 0; i4 < 10 * 30; i4++) {
            Assert.assertTrue(((CompletableFuture) copyOnWriteArrayList.get(i4)).isDone());
            Assert.assertTrue(((CompletableFuture) copyOnWriteArrayList2.get(i4)).isDone());
        }
        Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
        Awaitility.await().until(() -> {
            transaction2.commit().get();
            return true;
        });
    }

    @Test
    public void testGetTxnID() throws Exception {
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().build().get();
        TxnID txnID = transaction.getTxnID();
        transaction.abort();
        TxnID txnID2 = ((Transaction) this.pulsarClient.newTransaction().build().get()).getTxnID();
        Assert.assertEquals(txnID2.getLeastSigBits(), txnID.getLeastSigBits() + 1);
        Assert.assertEquals(txnID2.getMostSigBits(), 0L);
    }

    @Test
    public void testSubscriptionRecreateTopic() throws PulsarAdminException, NoSuchFieldException, IllegalAccessException, PulsarClientException {
        String str = "persistent://pulsar/system/testSubscriptionRecreateTopic";
        String str2 = "sub_testReCreateTopic";
        int i = 5;
        int i2 = 6;
        int i3 = 5;
        this.admin.topics().createNonPartitionedTopic("persistent://pulsar/system/testSubscriptionRecreateTopic");
        PulsarService pulsarService = super.getPulsarServiceList().get(0);
        pulsarService.getBrokerService().getTopics().clear();
        ManagedLedgerFactory managedLedgerFactory = pulsarService.getBrokerService().getManagedLedgerFactory();
        Field declaredField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        declaredField.setAccessible(true);
        ((ConcurrentHashMap) declaredField.get(managedLedgerFactory)).remove(TopicName.get("persistent://pulsar/system/testSubscriptionRecreateTopic").getPersistenceNamingEncoding());
        try {
            this.admin.topics().createNonPartitionedTopic("persistent://pulsar/system/testSubscriptionRecreateTopic");
            Assert.fail();
        } catch (PulsarAdminException.ConflictException e) {
            log.info("Cann`t create topic again");
        }
        this.admin.topics().setRetention("persistent://pulsar/system/testSubscriptionRecreateTopic", new RetentionPolicies(6, 6));
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://pulsar/system/testSubscriptionRecreateTopic"}).subscriptionName("sub_testReCreateTopic").subscribe();
        pulsarService.getBrokerService().getTopicIfExists("persistent://pulsar/system/testSubscriptionRecreateTopic").thenAccept(optional -> {
            if (!optional.isPresent()) {
                log.error("Failed o get Topic named: {}", str);
                Assert.fail();
            }
            PersistentTopic persistentTopic = (PersistentTopic) optional.get();
            try {
                this.admin.topics().setRetention(MLPendingAckStore.getTransactionPendingAckStoreSuffix(persistentTopic.getName(), str2), new RetentionPolicies(i3, i));
            } catch (PulsarAdminException e2) {
                log.error("Failed to get./setRetention of topic with Exception:" + e2);
                Assert.fail();
            }
            PersistentSubscription subscription = persistentTopic.getSubscription(str2);
            subscription.getPendingAckManageLedger().thenAccept(managedLedger -> {
                long retentionSizeInMB = managedLedger.getConfig().getRetentionSizeInMB();
                if (!persistentTopic.getTopicPolicies().isPresent()) {
                    log.error("Failed to getTopicPolicies of :" + persistentTopic);
                    Assert.fail();
                }
                Assert.assertEquals(i2, retentionSizeInMB);
                new MLPendingAckStoreProvider().newPendingAckStore(subscription).thenAccept(pendingAckStore -> {
                    ((MLPendingAckStore) pendingAckStore).getManagedLedger().thenAccept(managedLedger -> {
                        Assert.assertEquals(managedLedger.getConfig().getRetentionSizeInMB(), i);
                    });
                });
            });
        });
    }

    @Test
    public void testTakeSnapshotBeforeBuildTxnProducer() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://tnx/ns1/testSnapShot");
        Reader create = this.pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class)).startMessageId(MessageId.latest).topic("tnx/ns1/__transaction_buffer_snapshot").create();
        Awaitility.await().atMost(getPulsarServiceList().get(0).getConfiguration().getTransactionBufferSnapshotMinTimeInMillis() * 2, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Assert.assertFalse(create.hasMessageAvailable());
        });
        Producer create2 = this.pulsarClient.newProducer(Schema.STRING).producerName("testSnapshot").sendTimeout(0, TimeUnit.SECONDS).topic("persistent://tnx/ns1/testSnapShot").enableBatching(true).create();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(((TransactionBufferSnapshot) create.readNext().getValue()).getMaxReadPositionEntryId(), -1L);
        });
        create2.newMessage(Schema.STRING).value("common message send").send();
        create2.newMessage(Schema.STRING).value("common message send").send();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(((TransactionBufferSnapshot) create.readNext().getValue()).getMaxReadPositionEntryId(), 1L);
        });
    }

    @Test
    public void testAppendBufferWithNotManageLedgerExceptionCanCastToMLE() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://pulsar/system/testReCreateTopic");
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsarServiceList.get(0).getBrokerService().getTopic("persistent://pulsar/system/testReCreateTopic", false).get()).get();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Topic.PublishContext publishContext = new Topic.PublishContext() { // from class: org.apache.pulsar.broker.transaction.TransactionTest.1
            public String getProducerName() {
                return "test";
            }

            public long getSequenceId() {
                return 30L;
            }

            public String getOriginalProducerName() {
                return "test";
            }

            public long getOriginalSequenceId() {
                return 30L;
            }

            public long getHighestSequenceId() {
                return 30L;
            }

            public long getOriginalHighestSequenceId() {
                return 30L;
            }

            public long getNumberOfMessages() {
                return 30L;
            }

            public void completed(Exception exc, long j, long j2) {
                Assert.assertTrue(exc.getCause() instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException);
                countDownLatch.countDown();
            }
        };
        persistentTopic.getManagedLedger().close();
        persistentTopic.publishTxnMessage(new TxnID(123L, 321L), Unpooled.copiedBuffer("message", StandardCharsets.UTF_8), publishContext);
        Awaitility.await().until(() -> {
            countDownLatch.await();
            return true;
        });
    }

    @Test
    public void testMaxReadPositionForNormalPublish() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://tnx/ns1/NormalPublish");
        TopicTransactionBuffer transactionBuffer = ((PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic("persistent://tnx/ns1/NormalPublish", false).get()).get()).getTransactionBuffer();
        PulsarClient build = PulsarClient.builder().enableTransaction(false).serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).build();
        try {
            Producer create = build.newProducer(Schema.STRING).producerName("testNormalPublish").topic("persistent://tnx/ns1/NormalPublish").sendTimeout(0, TimeUnit.SECONDS).create();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(transactionBuffer.checkIfNoSnapshot());
            });
            MessageIdImpl send = create.newMessage().value("normal message").send();
            PositionImpl maxReadPosition = transactionBuffer.getMaxReadPosition();
            Assert.assertEquals(maxReadPosition.getLedgerId(), send.getLedgerId());
            Assert.assertEquals(maxReadPosition.getEntryId(), send.getEntryId());
            Producer create2 = this.pulsarClient.newProducer(Schema.STRING).producerName("testTransactionPublish").topic("persistent://tnx/ns1/NormalPublish").sendTimeout(0, TimeUnit.SECONDS).create();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(transactionBuffer.checkIfReady());
            });
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
            create2.newMessage(transaction).value("txn message").send();
            PositionImpl maxReadPosition2 = transactionBuffer.getMaxReadPosition();
            Assert.assertEquals(maxReadPosition2.getLedgerId(), send.getLedgerId());
            Assert.assertEquals(maxReadPosition2.getEntryId(), send.getEntryId());
            MessageIdImpl send2 = create.newMessage().value("normal message").send();
            PositionImpl maxReadPosition3 = transactionBuffer.getMaxReadPosition();
            Assert.assertEquals(maxReadPosition3.getLedgerId(), send.getLedgerId());
            Assert.assertEquals(maxReadPosition3.getEntryId(), send.getEntryId());
            transaction.commit().get();
            PositionImpl maxReadPosition4 = transactionBuffer.getMaxReadPosition();
            Assert.assertEquals(maxReadPosition4.getLedgerId(), send2.getLedgerId());
            Assert.assertEquals(maxReadPosition4.getEntryId(), send2.getEntryId() + 1);
            MessageIdImpl send3 = create.newMessage().value("normal message").send();
            PositionImpl maxReadPosition5 = transactionBuffer.getMaxReadPosition();
            Assert.assertEquals(maxReadPosition5.getLedgerId(), send3.getLedgerId());
            Assert.assertEquals(maxReadPosition5.getEntryId(), send3.getEntryId());
            Field declaredField = transactionBuffer.getClass().getSuperclass().getDeclaredField("state");
            declaredField.setAccessible(true);
            Field declaredField2 = TopicTransactionBuffer.class.getDeclaredField("maxReadPosition");
            declaredField2.setAccessible(true);
            declaredField.set(transactionBuffer, TopicTransactionBufferState.State.Initializing);
            create.newMessage().value("normal message").send();
            PositionImpl positionImpl = (PositionImpl) declaredField2.get(transactionBuffer);
            Assert.assertEquals(positionImpl.getLedgerId(), send3.getLedgerId());
            Assert.assertEquals(positionImpl.getEntryId(), send3.getEntryId());
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception {
        this.admin.topics().createNonPartitionedTopic("tnx/ns1/testEndTBRecoveringWhenManagerLedgerDisReadable");
        Producer create = this.pulsarClient.newProducer(Schema.STRING).producerName("test").enableBatching(false).sendTimeout(0, TimeUnit.SECONDS).topic("tnx/ns1/testEndTBRecoveringWhenManagerLedgerDisReadable").create();
        try {
            create.newMessage((Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get()).value("test").send();
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic("persistent://" + "tnx/ns1/testEndTBRecoveringWhenManagerLedgerDisReadable", false).get()).get();
            persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
            ManagedCursorImpl managedCursorImpl = (ManagedCursorImpl) Mockito.mock(ManagedCursorImpl.class);
            ((ManagedCursorImpl) Mockito.doReturn("transaction-buffer-sub").when(managedCursorImpl)).getName();
            ((ManagedCursorImpl) Mockito.doReturn(true).when(managedCursorImpl)).hasMoreEntries();
            ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock -> {
                ((AsyncCallbacks.ReadEntriesCallback) invocationOnMock.getArgument(1)).readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"), (Object) null);
                return null;
            }).when(managedCursorImpl)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback) ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl) ArgumentMatchers.any());
            Field declaredField = ManagedLedgerImpl.class.getDeclaredField("cursors");
            declaredField.setAccessible(true);
            ManagedCursorContainer managedCursorContainer = (ManagedCursorContainer) declaredField.get(persistentTopic.getManagedLedger());
            managedCursorContainer.removeCursor("transaction-buffer-sub");
            managedCursorContainer.add(managedCursorImpl, managedCursorImpl.getMarkDeletedPosition());
            ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock2 -> {
                ((AsyncCallbacks.ReadEntriesCallback) invocationOnMock2.getArgument(1)).readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), (Object) null);
                return null;
            }).when(managedCursorImpl)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback) ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl) ArgumentMatchers.any());
            TopicTransactionBuffer topicTransactionBuffer = new TopicTransactionBuffer(persistentTopic);
            Awaitility.await().atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> {
                Assert.assertEquals(topicTransactionBuffer.getStats(false).state, "Ready");
            });
            managedCursorContainer.removeCursor("transaction-buffer-sub");
            ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock3 -> {
                ((AsyncCallbacks.ReadEntriesCallback) invocationOnMock3.getArgument(1)).readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), (Object) null);
                return null;
            }).when(managedCursorImpl)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback) ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl) ArgumentMatchers.any());
            managedCursorContainer.add(managedCursorImpl, managedCursorImpl.getMarkDeletedPosition());
            TopicTransactionBuffer topicTransactionBuffer2 = new TopicTransactionBuffer(persistentTopic);
            Awaitility.await().atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> {
                Assert.assertEquals(topicTransactionBuffer2.getStats(false).state, "Ready");
            });
            persistentTopic.getInternalStats(false).thenAccept(persistentTopicInternalStats -> {
                Assert.assertTrue(persistentTopicInternalStats.cursors.isEmpty());
            });
            managedCursorContainer.removeCursor("transaction-buffer-sub");
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception {
        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"), 1L, TimeUnit.MILLISECONDS);
        this.admin.topics().createNonPartitionedTopic("tnx/ns1/testEndTPRecoveringWhenManagerLedgerDisReadable");
        Producer create = this.pulsarClient.newProducer(Schema.STRING).producerName("test").enableBatching(false).sendTimeout(0, TimeUnit.SECONDS).topic("tnx/ns1/testEndTPRecoveringWhenManagerLedgerDisReadable").create();
        try {
            create.newMessage().send();
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic("tnx/ns1/testEndTPRecoveringWhenManagerLedgerDisReadable", false).get()).get();
            persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
            PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic.createSubscription("test", CommandSubscribe.InitialPosition.Earliest, false, (Map) null).get();
            ManagedCursorImpl managedCursorImpl = (ManagedCursorImpl) Mockito.mock(ManagedCursorImpl.class);
            ((ManagedCursorImpl) Mockito.doReturn(true).when(managedCursorImpl)).hasMoreEntries();
            ((ManagedCursorImpl) Mockito.doReturn(false).when(managedCursorImpl)).isClosed();
            ((ManagedCursorImpl) Mockito.doReturn(new PositionImpl(-1L, -1L)).when(managedCursorImpl)).getMarkDeletedPosition();
            ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock -> {
                ((AsyncCallbacks.ReadEntriesCallback) invocationOnMock.getArgument(1)).readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"), (Object) null);
                return null;
            }).when(managedCursorImpl)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback) ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl) ArgumentMatchers.any());
            TransactionPendingAckStoreProvider transactionPendingAckStoreProvider = (TransactionPendingAckStoreProvider) Mockito.mock(TransactionPendingAckStoreProvider.class);
            ((TransactionPendingAckStoreProvider) Mockito.doReturn(CompletableFuture.completedFuture(new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursorImpl, (ManagedCursor) null, 500L, txnLogBufferedWriterConfig, hashedWheelTimer, DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS))).when(transactionPendingAckStoreProvider)).newPendingAckStore((PersistentSubscription) ArgumentMatchers.any());
            ((TransactionPendingAckStoreProvider) Mockito.doReturn(CompletableFuture.completedFuture(true)).when(transactionPendingAckStoreProvider)).checkInitializedBefore((PersistentSubscription) ArgumentMatchers.any());
            Field declaredField = PulsarService.class.getDeclaredField("transactionPendingAckStoreProvider");
            declaredField.setAccessible(true);
            TransactionPendingAckStoreProvider transactionPendingAckStoreProvider2 = (TransactionPendingAckStoreProvider) declaredField.get(getPulsarServiceList().get(0));
            declaredField.set(getPulsarServiceList().get(0), transactionPendingAckStoreProvider);
            PendingAckHandleImpl pendingAckHandleImpl = new PendingAckHandleImpl(persistentSubscription);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(pendingAckHandleImpl.getStats(false).state, "Ready");
            });
            ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock2 -> {
                ((AsyncCallbacks.ReadEntriesCallback) invocationOnMock2.getArgument(1)).readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), (Object) null);
                return null;
            }).when(managedCursorImpl)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback) ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl) ArgumentMatchers.any());
            PendingAckHandleImpl pendingAckHandleImpl2 = new PendingAckHandleImpl(persistentSubscription);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(pendingAckHandleImpl2.getStats(false).state, "Ready");
            });
            ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock3 -> {
                ((AsyncCallbacks.ReadEntriesCallback) invocationOnMock3.getArgument(1)).readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), (Object) null);
                return null;
            }).when(managedCursorImpl)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback) ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl) ArgumentMatchers.any());
            PendingAckHandleImpl pendingAckHandleImpl3 = new PendingAckHandleImpl(persistentSubscription);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(pendingAckHandleImpl3.getStats(false).state, "Ready");
            });
            hashedWheelTimer.stop();
            declaredField.set(getPulsarServiceList().get(0), transactionPendingAckStoreProvider2);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception {
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"), 1L, TimeUnit.MILLISECONDS);
        this.admin.topics().createNonPartitionedTopic("tnx/ns1/testEndTCRecoveringWhenManagerLedgerDisReadable");
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic("tnx/ns1/testEndTCRecoveringWhenManagerLedgerDisReadable", false).get()).get();
        persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
        HashMap hashMap = new HashMap();
        hashMap.put("max_local_txn_id", "1");
        persistentTopic.getManagedLedger().setProperties(hashMap);
        ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursor.class);
        ((ManagedCursor) Mockito.doReturn(true).when(managedCursor)).hasMoreEntries();
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.ReadEntriesCallback) invocationOnMock.getArgument(1)).readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"), (Object) null);
            return null;
        }).when(managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback) ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl) ArgumentMatchers.any());
        MLTransactionSequenceIdGenerator mLTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
        persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mLTransactionSequenceIdGenerator);
        MLTransactionLogImpl mLTransactionLogImpl = new MLTransactionLogImpl(new TransactionCoordinatorID(1L), (ManagedLedgerFactory) null, persistentTopic.getManagedLedger().getConfig(), new TxnLogBufferedWriterConfig(), hashedWheelTimer, DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS);
        Field declaredField = MLTransactionLogImpl.class.getDeclaredField("cursor");
        declaredField.setAccessible(true);
        declaredField.set(mLTransactionLogImpl, managedCursor);
        Field declaredField2 = MLTransactionLogImpl.class.getDeclaredField("managedLedger");
        declaredField2.setAccessible(true);
        declaredField2.set(mLTransactionLogImpl, persistentTopic.getManagedLedger());
        TransactionRecoverTracker transactionRecoverTracker = (TransactionRecoverTracker) Mockito.mock(TransactionRecoverTracker.class);
        ((TransactionRecoverTracker) Mockito.doNothing().when(transactionRecoverTracker)).appendOpenTransactionToTimeoutTracker();
        ((TransactionRecoverTracker) Mockito.doNothing().when(transactionRecoverTracker)).handleCommittingAndAbortingTransaction();
        TransactionTimeoutTracker transactionTimeoutTracker = (TransactionTimeoutTracker) Mockito.mock(TransactionTimeoutTracker.class);
        ((TransactionTimeoutTracker) Mockito.doNothing().when(transactionTimeoutTracker)).start();
        MLTransactionMetadataStore mLTransactionMetadataStore = new MLTransactionMetadataStore(new TransactionCoordinatorID(1L), mLTransactionLogImpl, transactionTimeoutTracker, mLTransactionSequenceIdGenerator, 0L);
        mLTransactionMetadataStore.init(transactionRecoverTracker).get();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(mLTransactionMetadataStore.getCoordinatorStats().state, "Ready");
        });
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock2 -> {
            ((AsyncCallbacks.ReadEntriesCallback) invocationOnMock2.getArgument(1)).readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), (Object) null);
            return null;
        }).when(managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback) ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl) ArgumentMatchers.any());
        MLTransactionMetadataStore mLTransactionMetadataStore2 = new MLTransactionMetadataStore(new TransactionCoordinatorID(1L), mLTransactionLogImpl, transactionTimeoutTracker, mLTransactionSequenceIdGenerator, 0L);
        mLTransactionMetadataStore2.init(transactionRecoverTracker).get();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(mLTransactionMetadataStore2.getCoordinatorStats().state, "Ready");
        });
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock3 -> {
            ((AsyncCallbacks.ReadEntriesCallback) invocationOnMock3.getArgument(1)).readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), (Object) null);
            return null;
        }).when(managedCursor)).asyncReadEntries(ArgumentMatchers.anyInt(), (AsyncCallbacks.ReadEntriesCallback) ArgumentMatchers.any(), ArgumentMatchers.any(), (PositionImpl) ArgumentMatchers.any());
        MLTransactionMetadataStore mLTransactionMetadataStore3 = new MLTransactionMetadataStore(new TransactionCoordinatorID(1L), mLTransactionLogImpl, transactionTimeoutTracker, mLTransactionSequenceIdGenerator, 0L);
        mLTransactionMetadataStore3.init(transactionRecoverTracker).get();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(mLTransactionMetadataStore3.getCoordinatorStats().state, "Ready");
        });
        hashedWheelTimer.stop();
    }

    @Test
    public void testEndTxnWhenCommittingOrAborting() throws Exception {
        CounterBrokerInterceptor counterBrokerInterceptor = (CounterBrokerInterceptor) getPulsarServiceList().get(0).getBrokerInterceptor();
        counterBrokerInterceptor.reset();
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Field declaredField = TransactionImpl.class.getDeclaredField("state");
        declaredField.setAccessible(true);
        declaredField.set(transaction, Transaction.State.COMMITTING);
        declaredField.set(transaction2, Transaction.State.ABORTING);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(counterBrokerInterceptor.getTxnCount(), 2);
        });
        transaction2.abort().get();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(counterBrokerInterceptor.getAbortedTxnCount(), 1);
        });
        transaction.commit().get();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(counterBrokerInterceptor.getCommittedTxnCount(), 1);
        });
    }

    @Test
    public void testNoEntryCanBeReadWhenRecovery() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsarServiceList.get(0).getBrokerService().getTopic(TopicName.get("tnx/ns1/test").toString(), true).get()).get();
        Field declaredField = PersistentTopic.class.getDeclaredField("ledger");
        Field declaredField2 = PersistentTopic.class.getDeclaredField("transactionBuffer");
        declaredField.setAccessible(true);
        declaredField2.setAccessible(true);
        ManagedLedgerImpl managedLedgerImpl = (ManagedLedgerImpl) Mockito.spy(declaredField.get(persistentTopic));
        declaredField.set(persistentTopic, managedLedgerImpl);
        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) declaredField2.get(persistentTopic);
        Field declaredField3 = TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
        declaredField3.setAccessible(true);
        ((AbortedTxnProcessor) declaredField3.get(topicTransactionBuffer)).takeAbortedTxnsSnapshot(topicTransactionBuffer.getMaxReadPosition()).get();
        ((ManagedLedgerImpl) Mockito.doReturn(PositionImpl.LATEST).when(managedLedgerImpl)).getLastConfirmedEntry();
        ManagedCursorImpl managedCursorImpl = (ManagedCursorImpl) Mockito.mock(ManagedCursorImpl.class);
        ((ManagedCursorImpl) Mockito.doReturn(false).when(managedCursorImpl)).hasMoreEntries();
        ((ManagedLedgerImpl) Mockito.doReturn(managedCursorImpl).when(managedLedgerImpl)).newNonDurableCursor((Position) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
        TopicTransactionBuffer topicTransactionBuffer2 = new TopicTransactionBuffer(persistentTopic);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(topicTransactionBuffer2.checkIfReady());
        });
    }

    @Test
    public void testRetryExceptionOfEndTxn() throws Exception {
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
        Class<TransactionMetadataStoreState> cls = TransactionMetadataStoreState.class;
        getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values().forEach(transactionMetadataStore -> {
            try {
                Field declaredField = cls.getDeclaredField("state");
                declaredField.setAccessible(true);
                declaredField.set(transactionMetadataStore, TransactionMetadataStoreState.State.Initializing);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        CompletableFuture commit = transaction.commit();
        try {
            commit.get(5L, TimeUnit.SECONDS);
            Assert.fail();
        } catch (TimeoutException e) {
        }
        getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values().stream().forEach(transactionMetadataStore2 -> {
            try {
                Field declaredField = cls.getDeclaredField("state");
                declaredField.setAccessible(true);
                declaredField.set(transactionMetadataStore2, TransactionMetadataStoreState.State.Ready);
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        });
        commit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    public void testCancelTxnTimeout() throws Exception {
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
        transaction.commit().get();
        Field declaredField = TransactionImpl.class.getDeclaredField("timeout");
        declaredField.setAccessible(true);
        Assert.assertTrue(((Timeout) declaredField.get(transaction)).isCancelled());
        Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
        transaction2.abort().get();
        Assert.assertTrue(((Timeout) declaredField.get(transaction2)).isCancelled());
    }

    @Test
    public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
        TransactionBuffer transactionBuffer = ((PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic("tnx/ns1/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true).get()).get()).getTransactionBuffer();
        Field declaredField = TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
        declaredField.setAccessible(true);
        Field declaredField2 = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
        declaredField2.setAccessible(true);
        AtomicLong atomicLong = (AtomicLong) declaredField2.get(transactionBuffer);
        Field declaredField3 = TopicTransactionBufferState.class.getDeclaredField("state");
        declaredField3.setAccessible(true);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((TopicTransactionBufferState.State) declaredField3.get(transactionBuffer), TopicTransactionBufferState.State.NoSnapshot);
        });
        Assert.assertEquals(atomicLong.get(), 0L);
        transactionBuffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1L, 1L));
        Assert.assertEquals(atomicLong.get(), 0L);
    }

    @Test
    public void testAutoCreateSchemaForTransactionSnapshot() throws Exception {
        String str = "tnx/ns2" + "/test";
        this.pulsarServiceList.forEach(pulsarService -> {
            pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(false);
        });
        this.admin.namespaces().createNamespace("tnx/ns2");
        this.admin.topics().createNonPartitionedTopic(str);
        TopicName systemTopicName = NamespaceEventsSystemTopicFactory.getSystemTopicName(TopicName.get(str).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
        TopicName systemTopicName2 = NamespaceEventsSystemTopicFactory.getSystemTopicName(TopicName.get(str).getNamespaceObject(), EventType.TOPIC_POLICY);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.schemas().getSchemaInfo(systemTopicName.toString()));
            Assert.assertNotNull(this.admin.schemas().getSchemaInfo(systemTopicName2.toString()));
        });
        this.pulsarServiceList.forEach(pulsarService2 -> {
            pulsarService2.getConfiguration().setAllowAutoUpdateSchemaEnabled(true);
        });
    }

    @Test
    public void testPendingAckMarkDeletePosition() throws Exception {
        getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(1L);
        getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(5.0d);
        String str = "tnx/ns1/testPendingAckMarkDeletePosition";
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("tnx/ns1/testPendingAckMarkDeletePosition").sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/testPendingAckMarkDeletePosition"}).subscriptionName("sub").subscribe();
            try {
                subscribe.getSubscription();
                ((Topic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic("tnx/ns1/testPendingAckMarkDeletePosition", false).get()).get()).getSubscription("sub").getCursor().getMarkDeletedPosition();
                create.newMessage().value("test".getBytes(StandardCharsets.UTF_8)).send();
                Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
                subscribe.acknowledgeAsync(subscribe.receive(10, TimeUnit.SECONDS).getMessageId(), transaction);
                transaction.commit().get();
                Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
                create.newMessage().value("test".getBytes(StandardCharsets.UTF_8)).send();
                subscribe.acknowledgeAsync(subscribe.receive(10, TimeUnit.SECONDS).getMessageId(), transaction2);
                Awaitility.await().untilAsserted(() -> {
                    ManagedLedgerInternalStats managedLedgerInternalStats = this.admin.transactions().getPendingAckInternalStats(str, "sub", false).pendingAckLogStats.managedLedgerInternalStats;
                    String[] split = ((ManagedLedgerInternalStats.CursorStats) managedLedgerInternalStats.cursors.get("__pending_ack_state")).markDeletePosition.split(":");
                    String[] split2 = managedLedgerInternalStats.lastConfirmedEntry.split(":");
                    Assert.assertEquals(split[0], split2[0]);
                    Assert.assertEquals(Integer.parseInt(split[1]), Integer.parseInt(split2[1]) - 2);
                });
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
        TransactionMetadataStore transactionMetadataStore = (TransactionMetadataStore) getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().get(new TransactionCoordinatorID(0L));
        Field declaredField = MLTransactionMetadataStore.class.getDeclaredField("transactionLog");
        declaredField.setAccessible(true);
        MLTransactionLogImpl mLTransactionLogImpl = (MLTransactionLogImpl) declaredField.get(transactionMetadataStore);
        Field declaredField2 = MLTransactionLogImpl.class.getDeclaredField("cursor");
        declaredField2.setAccessible(true);
        ((ManagedCursorImpl) declaredField2.get(mLTransactionLogImpl)).close();
        ((Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get()).commit().get();
    }

    @Test(timeOut = 30000)
    public void testTransactionAckMessageList() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://tnx/ns1/test").sendTimeout(5, TimeUnit.SECONDS).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://tnx/ns1/test"}).subscriptionName("testSub").subscribe();
            for (int i = 0; i < 5; i++) {
                try {
                    create.newMessage().send();
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 4; i2++) {
                arrayList.add(subscribe.receive().getMessageId());
            }
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
            subscribe.acknowledgeAsync(arrayList, transaction);
            transaction.abort().get();
            subscribe.close();
            Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://tnx/ns1/test"}).subscriptionName("testSub").subscribe();
            for (int i3 = 0; i3 < 4; i3++) {
                Assert.assertTrue(arrayList.contains(subscribe2.receive().getMessageId()));
            }
            Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
            subscribe2.acknowledgeAsync(arrayList, transaction2);
            transaction2.commit().get();
            subscribe2.close();
            subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://tnx/ns1/test"}).subscriptionName("testSub").subscribe();
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull(receive);
            Assert.assertFalse(arrayList.contains(receive.getMessageId()));
            Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
            subscribe.close();
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test(timeOut = 30000)
    public void testTransactionAckMessages() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://tnx/ns1/testTransactionAckMessages", 2);
        Producer create = this.pulsarClient.newProducer().topic("persistent://tnx/ns1/testTransactionAckMessages").sendTimeout(5, TimeUnit.SECONDS).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://tnx/ns1/testTransactionAckMessages"}).subscriptionName("testSub").subscribe();
            for (int i = 0; i < 4; i++) {
                try {
                    create.newMessage().send();
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            Method declaredMethod = ConsumerBase.class.getDeclaredMethod("getNewMessagesImpl", new Class[0]);
            declaredMethod.setAccessible(true);
            Field declaredField = MessagesImpl.class.getDeclaredField("messageList");
            declaredField.setAccessible(true);
            MessagesImpl messagesImpl = (MessagesImpl) declaredMethod.invoke(subscribe, new Object[0]);
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 4; i2++) {
                arrayList.add(subscribe.receive());
            }
            declaredField.set(messagesImpl, arrayList);
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
            subscribe.acknowledgeAsync(messagesImpl, transaction);
            transaction.abort().get();
            subscribe.close();
            Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://tnx/ns1/testTransactionAckMessages"}).subscriptionName("testSub").subscribe();
            ArrayList arrayList2 = new ArrayList();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                arrayList2.add(((Message) it.next()).getMessageId());
            }
            for (int i3 = 0; i3 < 4; i3++) {
                Assert.assertTrue(arrayList2.contains(subscribe2.receive(5, TimeUnit.SECONDS).getMessageId()));
            }
            Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
            subscribe2.acknowledgeAsync(messagesImpl, transaction2);
            transaction2.commit().get();
            subscribe2.close();
            subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://tnx/ns1/testTransactionAckMessages"}).subscriptionName("testSub").subscribe();
            Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
            subscribe.close();
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("tnx/ns1/testGetConnectExceptionForAckMsgWhenCnxIsNull").sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/testGetConnectExceptionForAckMsgWhenCnxIsNull"}).subscriptionName("sub").subscribe();
            for (int i = 0; i < 10; i++) {
                try {
                    create.newMessage().value(Bytes.toBytes(i)).send();
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            MethodUtils.invokeMethod(subscribe, true, "connectionClosed", new Object[]{(ClientCnx) MethodUtils.invokeMethod(subscribe, true, "cnx")});
            try {
                subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get()).get();
                Assert.fail();
            } catch (ExecutionException e) {
                Assert.assertTrue(e.getCause() instanceof PulsarClientException.ConnectException);
            }
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testPendingAckBatchMessageCommit() throws Exception {
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("tnx/ns1/testPendingAckBatchMessageCommit").enableBatching(true).batchingMaxPublishDelay(3L, TimeUnit.SECONDS).sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().subscriptionType(SubscriptionType.Shared).topic(new String[]{"tnx/ns1/testPendingAckBatchMessageCommit"}).subscriptionName("sub").subscribe();
            for (int i = 0; i < 5; i++) {
                try {
                    create.sendAsync(("test" + i).getBytes());
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            create.flush();
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.MINUTES).build().get();
            subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), transaction).get();
            Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.MINUTES).build().get();
            MessageId messageId = subscribe.receive().getMessageId();
            subscribe.acknowledgeAsync(messageId, transaction2).get();
            transaction.commit().get();
            transaction2.abort().get();
            subscribe.acknowledgeAsync(messageId, (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.MINUTES).build().get()).get();
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testPendingAckReplayChangeStateError() throws InterruptedException, TimeoutException {
        new AtomicInteger(1);
        final ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.mock(ServiceConfiguration.class);
        Mockito.when(Boolean.valueOf(serviceConfiguration.isTransactionCoordinatorEnabled())).thenReturn(true);
        ExecutorProvider executorProvider = (ExecutorProvider) Mockito.mock(ExecutorProvider.class);
        Mockito.when(executorProvider.getExecutor()).thenReturn(newSingleThreadScheduledExecutor);
        Mockito.when(executorProvider.getExecutor(ArgumentMatchers.any(Object.class))).thenReturn(newSingleThreadScheduledExecutor);
        PendingAckStore pendingAckStore = (PendingAckStore) Mockito.mock(PendingAckStore.class);
        ((PendingAckStore) Mockito.doAnswer(new Answer() { // from class: org.apache.pulsar.broker.transaction.TransactionTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                newSingleThreadScheduledExecutor.execute(() -> {
                    PendingAckHandleImpl pendingAckHandleImpl = (PendingAckHandleImpl) invocationOnMock.getArguments()[0];
                    pendingAckHandleImpl.closeAsync();
                    new MLPendingAckReplyCallBack(pendingAckHandleImpl).replayComplete();
                });
                return null;
            }
        }).when(pendingAckStore)).replayAsync((PendingAckHandleImpl) ArgumentMatchers.any(), (ExecutorService) ArgumentMatchers.any());
        TransactionPendingAckStoreProvider transactionPendingAckStoreProvider = (TransactionPendingAckStoreProvider) Mockito.mock(TransactionPendingAckStoreProvider.class);
        Mockito.when(transactionPendingAckStoreProvider.checkInitializedBefore((PersistentSubscription) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(true));
        Mockito.when(transactionPendingAckStoreProvider.newPendingAckStore((PersistentSubscription) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(pendingAckStore));
        PulsarService pulsarService = (PulsarService) Mockito.mock(PulsarService.class);
        Mockito.when(pulsarService.getConfig()).thenReturn(serviceConfiguration);
        Mockito.when(pulsarService.getTransactionExecutorProvider()).thenReturn(executorProvider);
        Mockito.when(pulsarService.getTransactionPendingAckStoreProvider()).thenReturn(transactionPendingAckStoreProvider);
        BrokerService brokerService = (BrokerService) Mockito.mock(BrokerService.class);
        Mockito.when(brokerService.getPulsar()).thenReturn(pulsarService);
        Mockito.when(brokerService.pulsar()).thenReturn(pulsarService);
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        Mockito.when(persistentTopic.getBrokerService()).thenReturn(brokerService);
        Mockito.when(persistentTopic.getName()).thenReturn("topic-a");
        ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursor.class);
        ((ManagedCursor) Mockito.doThrow(new Throwable[]{new RuntimeException("1")}).when(managedCursor)).updateLastActive();
        try {
            new PersistentSubscription(persistentTopic, "sub-a", managedCursor, false, Collections.emptyMap()).addConsumer((org.apache.pulsar.broker.service.Consumer) Mockito.mock(org.apache.pulsar.broker.service.Consumer.class)).get(5L, TimeUnit.SECONDS);
            Assert.fail("Expect failure by PendingAckHandle closed, but success");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException);
        }
    }

    @Test
    public void testTBRecoverChangeStateError() throws InterruptedException, TimeoutException {
        AtomicReference atomicReference = new AtomicReference();
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.mock(ServiceConfiguration.class);
        Mockito.when(Boolean.valueOf(serviceConfiguration.isEnableReplicatedSubscriptions())).thenReturn(false);
        Mockito.when(Boolean.valueOf(serviceConfiguration.isTransactionCoordinatorEnabled())).thenReturn(true);
        ExecutorProvider executorProvider = (ExecutorProvider) Mockito.mock(ExecutorProvider.class);
        Mockito.when(executorProvider.getExecutor(ArgumentMatchers.any(Object.class))).thenReturn(scheduledExecutorService);
        PendingAckStore pendingAckStore = (PendingAckStore) Mockito.mock(PendingAckStore.class);
        ((ScheduledExecutorService) Mockito.doAnswer(invocationOnMock -> {
            new Thread(() -> {
                TopicTransactionBufferRecoverCallBack topicTransactionBufferRecoverCallBack = null;
                try {
                    try {
                        topicTransactionBufferRecoverCallBack = (TopicTransactionBufferRecoverCallBack) FieldUtils.readField((TopicTransactionBuffer.TopicTransactionBufferRecover) invocationOnMock.getArguments()[0], "callBack", true);
                        ((PersistentTopic) atomicReference.get()).getTransactionBuffer().closeAsync().get();
                        if (topicTransactionBufferRecoverCallBack != null) {
                            topicTransactionBufferRecoverCallBack.recoverComplete();
                        }
                    } catch (Exception e) {
                        throw e;
                    }
                } catch (Throwable th) {
                    if (topicTransactionBufferRecoverCallBack != null) {
                        topicTransactionBufferRecoverCallBack.recoverComplete();
                    }
                    throw th;
                }
            }).start();
            return null;
        }).when(scheduledExecutorService)).execute((Runnable) ArgumentMatchers.any());
        TransactionPendingAckStoreProvider transactionPendingAckStoreProvider = (TransactionPendingAckStoreProvider) Mockito.mock(TransactionPendingAckStoreProvider.class);
        Mockito.when(transactionPendingAckStoreProvider.checkInitializedBefore((PersistentSubscription) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(true));
        Mockito.when(transactionPendingAckStoreProvider.newPendingAckStore((PersistentSubscription) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(pendingAckStore));
        SystemTopicTxnBufferSnapshotService systemTopicTxnBufferSnapshotService = (SystemTopicTxnBufferSnapshotService) Mockito.mock(SystemTopicTxnBufferSnapshotService.class);
        SystemTopicClient.Writer writer = (SystemTopicClient.Writer) Mockito.mock(SystemTopicClient.Writer.class);
        Mockito.when(writer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
        SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter referenceCountedWriter = (SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter) Mockito.mock(SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter.class);
        ((SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter) Mockito.doReturn(CompletableFuture.completedFuture(writer)).when(referenceCountedWriter)).getFuture();
        Mockito.when(systemTopicTxnBufferSnapshotService.getReferenceWriter((NamespaceName) ArgumentMatchers.any())).thenReturn(referenceCountedWriter);
        TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory = (TransactionBufferSnapshotServiceFactory) Mockito.mock(TransactionBufferSnapshotServiceFactory.class);
        Mockito.when(transactionBufferSnapshotServiceFactory.getTxnBufferSnapshotService()).thenReturn(systemTopicTxnBufferSnapshotService);
        PulsarService pulsarService = (PulsarService) Mockito.mock(PulsarService.class);
        Mockito.when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
        Mockito.when(pulsarService.getConfig()).thenReturn(serviceConfiguration);
        Mockito.when(pulsarService.getTransactionExecutorProvider()).thenReturn(executorProvider);
        Mockito.when(pulsarService.getTransactionBufferSnapshotServiceFactory()).thenReturn(transactionBufferSnapshotServiceFactory);
        Mockito.when(pulsarService.getTransactionBufferProvider()).thenReturn(new TopicTransactionBufferProvider());
        BacklogQuotaManager backlogQuotaManager = (BacklogQuotaManager) Mockito.mock(BacklogQuotaManager.class);
        BrokerService brokerService = (BrokerService) Mockito.mock(BrokerService.class);
        Mockito.when(brokerService.getPulsar()).thenReturn(pulsarService);
        Mockito.when(brokerService.pulsar()).thenReturn(pulsarService);
        Mockito.when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager);
        ManagedLedgerImpl managedLedgerImpl = (ManagedLedgerImpl) Mockito.mock(ManagedLedgerImpl.class);
        Mockito.when(managedLedgerImpl.getCursors()).thenReturn(new ManagedCursorContainer());
        Mockito.when(managedLedgerImpl.getLastConfirmedEntry()).thenReturn(PositionImpl.EARLIEST);
        atomicReference.set(new PersistentTopic("topic-a", managedLedgerImpl, brokerService));
        try {
            ((PersistentTopic) atomicReference.get()).checkIfTransactionBufferRecoverCompletely(true).get(5L, TimeUnit.SECONDS);
            Assert.fail("Expect failure by TB closed, but it is finished.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException);
        }
    }

    @Test
    public void testGetTxnState() throws Exception {
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.SECONDS).build().get();
        Assert.assertEquals(transaction.getState(), Transaction.State.OPEN);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(transaction.getState() == Transaction.State.TIME_OUT);
        });
        Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(3L, TimeUnit.SECONDS).build().get();
        transaction2.abort().get();
        Assert.assertEquals(transaction2.getState(), Transaction.State.ABORTED);
        Transaction transaction3 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(3L, TimeUnit.SECONDS).build().get();
        transaction3.commit().get();
        Assert.assertEquals(transaction3.getState(), Transaction.State.COMMITTED);
        Transaction transaction4 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.SECONDS).build().get();
        this.pulsarServiceList.get(0).getTransactionMetadataStoreService().endTransaction(transaction4.getTxnID(), 0, false);
        transaction4.commit();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(transaction4.getState() == Transaction.State.ERROR);
        });
        TransactionImpl transactionImpl = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(3L, TimeUnit.SECONDS).build().get();
        transactionImpl.registerSendOp(new CompletableFuture());
        transactionImpl.commit();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(transactionImpl.getState() == Transaction.State.COMMITTING);
        });
        TransactionImpl transactionImpl2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(3L, TimeUnit.SECONDS).build().get();
        transactionImpl2.registerSendOp(new CompletableFuture());
        transactionImpl2.abort();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(transactionImpl2.getState() == Transaction.State.ABORTING);
        });
    }

    @Test
    public void testEncryptionRequired() throws Exception {
        this.admin.namespaces().createNamespace("tnx/testEncryptionRequired");
        this.admin.namespaces().setEncryptionRequiredStatus("tnx/testEncryptionRequired", true);
        this.admin.topics().createNonPartitionedTopic("persistent://tnx/testEncryptionRequired/test_transaction_topic");
        Producer create = this.pulsarClient.newProducer().topic("persistent://tnx/testEncryptionRequired/test_transaction_topic").sendTimeout(5, TimeUnit.SECONDS).addEncryptionKey("my-app-key").defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem").create();
        try {
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
            create.newMessage(transaction).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)).send();
            transaction.commit();
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testDeleteNamespace() throws Exception {
        String str = "tnx/ns-" + RandomStringUtils.randomAlphabetic(5);
        String str2 = str + "/test-delete-ns";
        this.admin.namespaces().createNamespace(str);
        Producer create = this.pulsarClient.newProducer().topic(str2).create();
        try {
            create.newMessage().value("test".getBytes()).send();
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str2}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub").subscribe();
            try {
                subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), transaction).get();
                if (subscribe != null) {
                    subscribe.close();
                }
                Producer create2 = this.pulsarClient.newProducer().topic(str2 + "-out").create();
                try {
                    create2.newMessage(transaction).value("output".getBytes()).send();
                    if (create2 != null) {
                        create2.close();
                    }
                    transaction.commit();
                    if (create != null) {
                        create.close();
                    }
                    this.admin.namespaces().deleteNamespace(str, true);
                } catch (Throwable th) {
                    if (create2 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test(timeOut = 10000)
    public void testTBSnapshotWriter() throws Exception {
        String str = "tnx/ns-" + RandomStringUtils.randomAlphabetic(5);
        this.admin.namespaces().createNamespace(str, 16);
        String str2 = str + "/test-create-snapshot-writer-failed";
        this.admin.topics().createPartitionedTopic(str2, 20);
        Field declaredField = SystemTopicTxnBufferSnapshotService.class.getDeclaredField("refCountedWriterMap");
        declaredField.setAccessible(true);
        CompletableFuture completableFuture = new CompletableFuture();
        for (PulsarService pulsarService : this.pulsarServiceList) {
            SystemTopicTxnBufferSnapshotService txnBufferSnapshotService = pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService();
            ((ConcurrentHashMap) declaredField.get(txnBufferSnapshotService)).put(NamespaceName.get(str), new SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter(NamespaceName.get(str), completableFuture, txnBufferSnapshotService));
            SystemTopicTxnBufferSnapshotService txnBufferSnapshotSegmentService = pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotSegmentService();
            ((ConcurrentHashMap) declaredField.get(txnBufferSnapshotSegmentService)).put(NamespaceName.get(str), new SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter(NamespaceName.get(str), completableFuture, txnBufferSnapshotSegmentService));
            SystemTopicTxnBufferSnapshotService txnBufferSnapshotIndexService = pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotIndexService();
            ((ConcurrentHashMap) declaredField.get(txnBufferSnapshotIndexService)).put(NamespaceName.get(str), new SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter(NamespaceName.get(str), completableFuture, txnBufferSnapshotIndexService));
        }
        CompletableFuture createAsync = this.pulsarClient.newProducer().topic(str2).sendTimeout(0, TimeUnit.SECONDS).createAsync();
        getTopic("persistent://" + str2 + "-partition-0");
        Thread.sleep(3000L);
        Assert.assertFalse(createAsync.isDone());
        completableFuture.completeExceptionally(new PulsarClientException.TopicTerminatedException("failed writer"));
        Producer producer = (Producer) createAsync.get();
        for (int i = 0; i < 20 * 2; i++) {
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.MINUTES).build().get();
            producer.newMessage(transaction).value("test".getBytes()).sendAsync();
            transaction.commit().get();
        }
        checkSnapshotPublisherCount(str, 1);
        producer.close();
        this.admin.topics().unload(str2);
        checkSnapshotPublisherCount(str, 0);
    }

    private void getTopic(String str) {
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(100L, TimeUnit.MILLISECONDS).until(() -> {
            Iterator<PulsarService> it = this.pulsarServiceList.iterator();
            while (it.hasNext()) {
                if (it.next().getBrokerService().getTopicReference(str).isPresent()) {
                    return true;
                }
            }
            return false;
        });
    }

    @Test
    public void testReadCommittedWithReadCompacted() throws Exception {
        this.admin.namespaces().createNamespace("tnx/testReadCommittedWithReadCompacted");
        this.admin.topics().createNonPartitionedTopic("persistent://tnx/testReadCommittedWithReadCompacted/test_transaction_topic");
        this.admin.topicPolicies().setCompactionThreshold("persistent://tnx/testReadCommittedWithReadCompacted/test_transaction_topic", 104857600L);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://tnx/testReadCommittedWithReadCompacted/test_transaction_topic"}).subscriptionName("sub").subscriptionType(SubscriptionType.Exclusive).readCompacted(true).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://tnx/testReadCommittedWithReadCompacted/test_transaction_topic").create();
            try {
                create.newMessage().key("K1").value("V1").send();
                Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.MINUTES).build().get();
                create.newMessage(transaction).key("K2").value("V2").send();
                create.newMessage(transaction).key("K3").value("V3").send();
                ArrayList arrayList = new ArrayList();
                while (true) {
                    Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                    if (receive == null) {
                        break;
                    } else {
                        arrayList.add((String) receive.getValue());
                    }
                }
                Assert.assertEquals(arrayList, List.of("V1"));
                transaction.commit();
                arrayList.clear();
                while (true) {
                    Message receive2 = subscribe.receive(5, TimeUnit.SECONDS);
                    if (receive2 == null) {
                        break;
                    } else {
                        arrayList.add((String) receive2.getValue());
                    }
                }
                Assert.assertEquals(arrayList, List.of("V2", "V3"));
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testReadCommittedWithCompaction() throws Exception {
        String str = "persistent://tnx/ns-prechecks/test_transaction_topic" + UUID.randomUUID();
        this.admin.namespaces().createNamespace("tnx/ns-prechecks");
        this.admin.topics().createNonPartitionedTopic(str);
        this.admin.topicPolicies().setCompactionThreshold(str, 104857600L);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).create();
        try {
            create.newMessage().key("K1").value("V1").send();
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.MINUTES).build().get();
            create.newMessage(transaction).key("K2").value("V2").send();
            create.newMessage(transaction).key("K3").value("V3").send();
            transaction.commit().get();
            create.newMessage().key("K1").value("V4").send();
            Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.MINUTES).build().get();
            create.newMessage(transaction2).key("K2").value("V5").send();
            create.newMessage(transaction2).key("K3").value("V6").send();
            transaction2.commit().get();
            this.admin.topics().triggerCompaction(str);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin.topics().compactionStatus(str).status, LongRunningProcessStatus.Status.SUCCESS);
            });
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe();
            try {
                ArrayList arrayList = new ArrayList();
                while (true) {
                    Message receive = subscribe.receive(2, TimeUnit.SECONDS);
                    if (receive == null) {
                        break;
                    } else {
                        arrayList.add((String) receive.getValue());
                    }
                }
                Assert.assertEquals(arrayList, List.of("V4", "V5", "V6"));
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }
}
