/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class TransactionEndToEndTest
extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionEndToEndTest.class);
    private static final int TOPIC_PARTITION = 3;
    private static final String TENANT = "tnx";
    private static final String NAMESPACE1 = "tnx/ns1";
    private static final String TOPIC_OUTPUT = "tnx/ns1/output";
    private static final String TOPIC_MESSAGE_ACK_TEST = "tnx/ns1/message-ack-test";
    private static final int NUM_PARTITIONS = 16;

    @BeforeMethod
    protected void setup() throws Exception {
        this.setBrokerCount(1);
        this.internalSetup();
        String[] brokerServiceUrlArr = this.getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
        this.admin.tenants().createTenant(TENANT, (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace(NAMESPACE1);
        this.admin.topics().createPartitionedTopic(TOPIC_OUTPUT, 3);
        this.admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
        this.admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
        this.admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.getPulsarServiceList().get(0).getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).enableTransaction(true).build();
        this.waitForCoordinatorToBeAvailable(16);
    }

    @AfterMethod(alwaysRun=true)
    protected void cleanup() {
        super.internalCleanup();
    }

    @Test
    public void noBatchProduceCommitTest() throws Exception {
        this.produceCommitTest(false);
    }

    @Test
    public void batchProduceCommitTest() throws Exception {
        this.produceCommitTest(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void produceCommitTest(boolean enableBatch) throws Exception {
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{TOPIC_OUTPUT}).subscriptionName("test").enableBatchIndexAcknowledgment(true).subscribe();
        try {
            Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
            ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(TOPIC_OUTPUT).enableBatching(enableBatch).sendTimeout(0, TimeUnit.SECONDS);
            Producer producer = producerBuilder.create();
            try {
                Transaction txn1 = this.getTxn();
                Transaction txn2 = this.getTxn();
                int txnMessageCnt = 0;
                int messageCnt = 1000;
                for (int i = 0; i < messageCnt; ++i) {
                    if (i % 5 == 0) {
                        producer.newMessage(txn1).value((Object)("Hello Txn - " + i).getBytes(StandardCharsets.UTF_8)).sendAsync();
                    } else {
                        producer.newMessage(txn2).value((Object)("Hello Txn - " + i).getBytes(StandardCharsets.UTF_8)).sendAsync();
                    }
                    ++txnMessageCnt;
                }
                Message message = consumer.receive(5, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                txn1.commit().get();
                txn2.commit().get();
                int receiveCnt = 0;
                for (int i = 0; i < txnMessageCnt; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    ++receiveCnt;
                }
                Assert.assertEquals((int)txnMessageCnt, (int)receiveCnt);
                message = consumer.receive(5, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                message = consumer.receive(5, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                log.info("message commit test enableBatch {}", (Object)enableBatch);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void produceAbortTest() throws Exception {
        Transaction txn = this.getTxn();
        String subName = "test";
        Producer producer = this.pulsarClient.newProducer().topic(TOPIC_OUTPUT).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            int messageCnt = 10;
            for (int i = 0; i < messageCnt; ++i) {
                producer.newMessage(txn).value((Object)("Hello Txn - " + i).getBytes(StandardCharsets.UTF_8)).send();
            }
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{TOPIC_OUTPUT}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName(subName).enableBatchIndexAcknowledgment(true).subscribe();
            try {
                Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
                Message message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                txn.abort().get();
                message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                Awaitility.await().until(() -> {
                    boolean flag = true;
                    for (int partition = 0; partition < 3; ++partition) {
                        String topic = TopicName.get((String)TOPIC_OUTPUT).getPartition(partition).toString();
                        boolean exist = false;
                        for (int i = 0; i < this.getPulsarServiceList().size(); ++i) {
                            PositionImpl markerPosition;
                            Optional topicOptional;
                            Field field = BrokerService.class.getDeclaredField("topics");
                            field.setAccessible(true);
                            ConcurrentOpenHashMap topics = (ConcurrentOpenHashMap)field.get(this.getPulsarServiceList().get(i).getBrokerService());
                            CompletableFuture topicFuture = (CompletableFuture)topics.get((Object)topic);
                            if (topicFuture == null || !(topicOptional = (Optional)topicFuture.get()).isPresent()) continue;
                            PersistentSubscription persistentSubscription = (PersistentSubscription)((Topic)topicOptional.get()).getSubscription(subName);
                            Position markDeletePosition = persistentSubscription.getCursor().getMarkDeletedPosition();
                            Position lastConfirmedEntry = persistentSubscription.getCursor().getManagedLedger().getLastConfirmedEntry();
                            exist = true;
                            if (markDeletePosition.equals(lastConfirmedEntry) || (markerPosition = ((ManagedLedgerImpl)persistentSubscription.getCursor().getManagedLedger()).getNextValidPosition((PositionImpl)markDeletePosition)).equals(lastConfirmedEntry)) continue;
                            log.error("Mark delete position is not commit marker position!");
                            flag = false;
                        }
                        Assert.assertTrue((boolean)exist);
                    }
                    return flag;
                });
                log.info("finished test partitionAbortTest");
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test
    public void txnIndividualAckTestNoBatchAndSharedSub() throws Exception {
        this.txnAckTest(false, 1, SubscriptionType.Shared);
    }

    @Test
    public void txnIndividualAckTestBatchAndSharedSub() throws Exception {
        this.txnAckTest(true, 200, SubscriptionType.Shared);
    }

    @Test
    public void txnIndividualAckTestNoBatchAndFailoverSub() throws Exception {
        this.txnAckTest(false, 1, SubscriptionType.Failover);
    }

    @Test
    public void txnIndividualAckTestBatchAndFailoverSub() throws Exception {
        this.txnAckTest(true, 200, SubscriptionType.Failover);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void txnAckTest(boolean batchEnable, int maxBatchSize, SubscriptionType subscriptionType) throws Exception {
        String normalTopic = "tnx/ns1/normal-topic";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{normalTopic}).subscriptionName("test").enableBatchIndexAcknowledgment(true).subscriptionType(subscriptionType).subscribe();
        try {
            Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
            Producer producer = this.pulsarClient.newProducer().topic(normalTopic).enableBatching(batchEnable).batchingMaxMessages(maxBatchSize).create();
            try {
                for (int retryCnt = 0; retryCnt < 2; ++retryCnt) {
                    int i;
                    Transaction txn = this.getTxn();
                    int messageCnt = 1000;
                    for (i = 0; i < messageCnt; ++i) {
                        producer.newMessage().value((Object)"hello".getBytes()).sendAsync();
                    }
                    for (i = 0; i < messageCnt; ++i) {
                        Message message = consumer.receive();
                        Assert.assertNotNull((Object)message);
                        log.info("receive msgId: {}, count : {}", (Object)message.getMessageId(), (Object)i);
                        consumer.acknowledgeAsync(message.getMessageId(), txn).get();
                    }
                    Message message = consumer.receive(2, TimeUnit.SECONDS);
                    Assert.assertNull((Object)message);
                    txn.abort().get();
                    Transaction commitTxn = this.getTxn();
                    for (int i2 = 0; i2 < messageCnt; ++i2) {
                        message = consumer.receive(2, TimeUnit.SECONDS);
                        Assert.assertNotNull((Object)message);
                        consumer.acknowledgeAsync(message.getMessageId(), commitTxn).get();
                        log.info("receive msgId: {}, count: {}", (Object)message.getMessageId(), (Object)i2);
                    }
                    commitTxn.commit().get();
                    message = consumer.receive(2, TimeUnit.SECONDS);
                    Assert.assertNull((Object)message);
                    Field field = TransactionImpl.class.getDeclaredField("state");
                    field.setAccessible(true);
                    field.set(commitTxn, TransactionImpl.State.OPEN);
                    try {
                        commitTxn.commit().get();
                        Assert.fail((String)"recommit one transaction should be failed.");
                        continue;
                    }
                    catch (Exception reCommitError) {
                        log.info("expected exception for recommit one transaction.");
                        Assert.assertNotNull((Object)reCommitError);
                        Assert.assertTrue((boolean)(reCommitError.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException));
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void txnMessageAckTest() throws Exception {
        String topic = TOPIC_MESSAGE_ACK_TEST;
        String subName = "test";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("test").enableBatchIndexAcknowledgment(true).acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
        try {
            Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
            Producer producer = this.pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
            try {
                Transaction txn = this.getTxn();
                int messageCnt = 10;
                for (int i = 0; i < messageCnt; ++i) {
                    producer.newMessage(txn).value((Object)("Hello Txn - " + i).getBytes(StandardCharsets.UTF_8)).sendAsync();
                }
                log.info("produce transaction messages finished");
                Message message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                log.info("transaction messages can't be received before transaction committed");
                txn.commit().get();
                int ackedMessageCount = 0;
                int receiveCnt = 0;
                for (int i = 0; i < messageCnt; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    ++receiveCnt;
                    if (i % 2 != 0) continue;
                    consumer.acknowledge(message);
                    ++ackedMessageCount;
                }
                Assert.assertEquals((int)messageCnt, (int)receiveCnt);
                message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                String checkTopic = TopicName.get((String)topic).getPartition(0).toString();
                PersistentTopicInternalStats stats = this.admin.topics().getInternalStats(checkTopic, false);
                Assert.assertNotEquals((Object)((ManagedLedgerInternalStats.CursorStats)stats.cursors.get((Object)"test")).markDeletePosition, (Object)stats.lastConfirmedEntry);
                consumer.redeliverUnacknowledgedMessages();
                receiveCnt = 0;
                for (int i = 0; i < messageCnt - ackedMessageCount; ++i) {
                    message = consumer.receive(2, TimeUnit.SECONDS);
                    Assert.assertNotNull((Object)message);
                    consumer.acknowledge(message);
                    ++receiveCnt;
                }
                Assert.assertEquals((int)(messageCnt - ackedMessageCount), (int)receiveCnt);
                message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                topic = TopicName.get((String)topic).getPartition(0).toString();
                boolean exist = false;
                for (int i = 0; i < this.getPulsarServiceList().size(); ++i) {
                    PositionImpl markerPosition;
                    Optional topicOptional;
                    Field field = BrokerService.class.getDeclaredField("topics");
                    field.setAccessible(true);
                    ConcurrentOpenHashMap topics = (ConcurrentOpenHashMap)field.get(this.getPulsarServiceList().get(i).getBrokerService());
                    CompletableFuture topicFuture = (CompletableFuture)topics.get((Object)topic);
                    if (topicFuture == null || !(topicOptional = (Optional)topicFuture.get()).isPresent()) continue;
                    PersistentSubscription persistentSubscription = (PersistentSubscription)((Topic)topicOptional.get()).getSubscription("test");
                    Position markDeletePosition = persistentSubscription.getCursor().getMarkDeletedPosition();
                    Position lastConfirmedEntry = persistentSubscription.getCursor().getManagedLedger().getLastConfirmedEntry();
                    exist = true;
                    if (markDeletePosition.equals(lastConfirmedEntry) || (markerPosition = ((ManagedLedgerImpl)persistentSubscription.getCursor().getManagedLedger()).getNextValidPosition((PositionImpl)markDeletePosition)).equals(lastConfirmedEntry)) continue;
                    log.error("Mark delete position is not commit marker position!");
                    Assert.fail();
                }
                Assert.assertTrue((boolean)exist);
                log.info("receive transaction messages count: {}", (Object)receiveCnt);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    @Test
    public void txnAckTestBatchAndCumulativeSub() throws Exception {
        this.txnCumulativeAckTest(true, 200, SubscriptionType.Failover);
    }

    @Test
    public void txnAckTestNoBatchAndCumulativeSub() throws Exception {
        this.txnCumulativeAckTest(false, 1, SubscriptionType.Failover);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, SubscriptionType subscriptionType) throws Exception {
        String normalTopic = "tnx/ns1/normal-topic";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{normalTopic}).subscriptionName("test").enableBatchIndexAcknowledgment(true).subscriptionType(subscriptionType).ackTimeout(1L, TimeUnit.MINUTES).subscribe();
        try {
            Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
            Producer producer = this.pulsarClient.newProducer().topic(normalTopic).enableBatching(batchEnable).batchingMaxMessages(maxBatchSize).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).create();
            try {
                for (int retryCnt = 0; retryCnt < 2; ++retryCnt) {
                    Transaction abortTxn = this.getTxn();
                    int messageCnt = 1000;
                    for (int i = 0; i < messageCnt; ++i) {
                        producer.newMessage().value((Object)"hello".getBytes()).sendAsync();
                    }
                    Message message = null;
                    Thread.sleep(1000L);
                    for (int i = 0; i < messageCnt; ++i) {
                        message = consumer.receive(1, TimeUnit.SECONDS);
                        Assert.assertNotNull((Object)message);
                        if (i % 3 == 0) {
                            consumer.acknowledgeCumulativeAsync(message.getMessageId(), abortTxn).get();
                        }
                        log.info("receive msgId abort: {}, retryCount : {}, count : {}", new Object[]{message.getMessageId(), retryCnt, i});
                    }
                    try {
                        consumer.acknowledgeCumulativeAsync(message.getMessageId(), abortTxn).get();
                        Assert.fail((String)"not ack conflict ");
                    }
                    catch (Exception e) {
                        Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.TransactionConflictException));
                    }
                    try {
                        consumer.acknowledgeCumulativeAsync(DefaultImplementation.newMessageId((long)((MessageIdImpl)message.getMessageId()).getLedgerId(), (long)(((MessageIdImpl)message.getMessageId()).getEntryId() - 1L), (int)-1), abortTxn).get();
                        Assert.fail((String)"not ack conflict ");
                    }
                    catch (Exception e) {
                        Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.TransactionConflictException));
                    }
                    message = consumer.receive(2, TimeUnit.SECONDS);
                    Assert.assertNull((Object)message);
                    abortTxn.abort().get();
                    Transaction commitTxn = this.getTxn();
                    for (int i = 0; i < messageCnt; ++i) {
                        message = consumer.receive(1, TimeUnit.SECONDS);
                        Assert.assertNotNull((Object)message);
                        if (i % 3 == 0) {
                            consumer.acknowledgeCumulativeAsync(message.getMessageId(), commitTxn).get();
                        }
                        log.info("receive msgId abort: {}, retryCount : {}, count : {}", new Object[]{message.getMessageId(), retryCnt, i});
                    }
                    commitTxn.commit().get();
                    Field field = TransactionImpl.class.getDeclaredField("state");
                    field.setAccessible(true);
                    field.set(commitTxn, TransactionImpl.State.OPEN);
                    try {
                        commitTxn.commit().get();
                        Assert.fail((String)"recommit one transaction should be failed.");
                    }
                    catch (Exception reCommitError) {
                        log.info("expected exception for recommit one transaction.");
                        Assert.assertNotNull((Object)reCommitError);
                        Assert.assertTrue((boolean)(reCommitError.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException));
                    }
                    message = consumer.receive(1, TimeUnit.SECONDS);
                    Assert.assertNull((Object)message);
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    private Transaction getTxn() throws Exception {
        return (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
    }

    private void markDeletePositionCheck(String topic, String subName, boolean equalsWithLastConfirm) throws Exception {
        for (int i = 0; i < 3; ++i) {
            PersistentTopicInternalStats stats = null;
            String checkTopic = TopicName.get((String)topic).getPartition(i).toString();
            for (int j = 0; j < 10; ++j) {
                stats = this.admin.topics().getInternalStats(checkTopic, false);
                if (stats.lastConfirmedEntry.equals(((ManagedLedgerInternalStats.CursorStats)stats.cursors.get((Object)subName)).markDeletePosition)) break;
                Thread.sleep(200L);
            }
            if (equalsWithLastConfirm) {
                Assert.assertEquals((String)((ManagedLedgerInternalStats.CursorStats)stats.cursors.get((Object)subName)).markDeletePosition, (String)stats.lastConfirmedEntry);
                continue;
            }
            Assert.assertNotEquals((Object)((ManagedLedgerInternalStats.CursorStats)stats.cursors.get((Object)subName)).markDeletePosition, (Object)stats.lastConfirmedEntry);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void txnMetadataHandlerRecoverTest() throws Exception {
        String topic = "tnx/ns1/tc-metadata-handler-recover";
        Producer producer = this.pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            ArrayList<TxnID> txnIDList = new ArrayList<TxnID>();
            int txnCnt = 20;
            int messageCnt = 10;
            for (int i = 0; i < txnCnt; ++i) {
                TransactionImpl txn = (TransactionImpl)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
                for (int j = 0; j < messageCnt; ++j) {
                    producer.newMessage((Transaction)txn).value((Object)"Hello".getBytes()).sendAsync().get();
                }
                txnIDList.add(new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
            }
            PulsarClientImpl recoverPulsarClient = (PulsarClientImpl)PulsarClient.builder().serviceUrl(this.getPulsarServiceList().get(0).getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).enableTransaction(true).build();
            try {
                TransactionCoordinatorClientImpl tcClient = recoverPulsarClient.getTcClient();
                for (TxnID txnID : txnIDList) {
                    tcClient.commit(txnID);
                }
                Consumer consumer = recoverPulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                try {
                    Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
                    for (int i = 0; i < txnCnt * messageCnt; ++i) {
                        Message message = consumer.receive();
                        Assert.assertNotNull((Object)message);
                    }
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(recoverPulsarClient).get(0) != null) {
                    recoverPulsarClient.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void produceTxnMessageOrderTest() throws Exception {
        String topic = "tnx/ns1/txn-produce-order";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("test").subscribe();
        try {
            Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
            Producer producer = this.pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).producerName("txn-publish-order").create();
            try {
                for (int ti = 0; ti < 10; ++ti) {
                    int i;
                    Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                    for (i = 0; i < 1000; ++i) {
                        producer.newMessage(txn).value((Object)("" + i).getBytes()).sendAsync();
                    }
                    txn.commit().get();
                    for (i = 0; i < 1000; ++i) {
                        Message message = consumer.receive(5, TimeUnit.SECONDS);
                        Assert.assertNotNull((Object)message);
                        Assert.assertEquals((Object)Integer.valueOf(new String(message.getData())), (Object)new Integer(i));
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void produceAndConsumeCloseStateTxnTest() throws Exception {
        String topic = "tnx/ns1/txn-close-state";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("test").subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).producerName("txn-close-state").create();
            try {
                Transaction produceTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                Transaction consumeTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                producer.newMessage(produceTxn).value((Object)"Hello Pulsar!".getBytes()).sendAsync().get();
                produceTxn.commit().get();
                try {
                    producer.newMessage(produceTxn).value((Object)"Hello Pulsar!".getBytes()).sendAsync().get();
                    Assert.fail();
                }
                catch (Exception e) {
                    Assert.assertTrue((boolean)(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException));
                }
                try {
                    produceTxn.commit().get();
                    Assert.fail();
                }
                catch (Exception e) {
                    Assert.assertTrue((boolean)(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException));
                }
                Message message = consumer.receive(5, TimeUnit.SECONDS);
                consumer.acknowledgeAsync(message.getMessageId(), consumeTxn).get();
                consumeTxn.commit().get();
                try {
                    consumer.acknowledgeAsync(message.getMessageId(), consumeTxn).get();
                    Assert.fail();
                }
                catch (Exception e) {
                    Assert.assertTrue((boolean)(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException));
                }
                try {
                    consumeTxn.commit().get();
                    Assert.fail();
                }
                catch (Exception e) {
                    Assert.assertTrue((boolean)(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException));
                }
                Transaction timeoutTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.SECONDS).build().get();
                AtomicReference transactionMetadataStore = new AtomicReference();
                this.getPulsarServiceList().forEach(pulsarService -> {
                    if (pulsarService.getTransactionMetadataStoreService().getStores().containsKey(TransactionCoordinatorID.get((long)((TransactionImpl)timeoutTxn).getTxnIdMostBits()))) {
                        transactionMetadataStore.set(pulsarService.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get((long)((TransactionImpl)timeoutTxn).getTxnIdMostBits())));
                    }
                });
                Awaitility.await().until(() -> {
                    try {
                        ((TransactionMetadataStore)transactionMetadataStore.get()).getTxnMeta(new TxnID(((TransactionImpl)timeoutTxn).getTxnIdMostBits(), ((TransactionImpl)timeoutTxn).getTxnIdLeastBits())).get();
                        return false;
                    }
                    catch (Exception e) {
                        return true;
                    }
                });
                try {
                    timeoutTxn.commit().get();
                    Assert.fail();
                }
                catch (Exception e) {
                    Assert.assertTrue((boolean)(e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException));
                }
                Field field = TransactionImpl.class.getDeclaredField("state");
                field.setAccessible(true);
                TransactionImpl.State state = (TransactionImpl.State)field.get(timeoutTxn);
                Assert.assertEquals((Object)state, (Object)TransactionImpl.State.ERROR);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void transactionTimeoutTest() throws Exception {
        String topic = "tnx/ns1/txn-timeout";
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("test").subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).producerName("txn-timeout").create();
            try {
                producer.send((Object)"Hello Pulsar!");
                Transaction consumeTimeoutTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(3L, TimeUnit.SECONDS).build().get();
                Message message = consumer.receive();
                consumer.acknowledgeAsync(message.getMessageId(), consumeTimeoutTxn).get();
                Message reReceiveMessage = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)reReceiveMessage);
                reReceiveMessage = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertEquals((String)((String)reReceiveMessage.getValue()), (String)((String)message.getValue()));
                Assert.assertEquals((Object)reReceiveMessage.getMessageId(), (Object)message.getMessageId());
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    @DataProvider(name="ackType")
    public static Object[] ackType() {
        return new Object[]{CommandAck.AckType.Cumulative, CommandAck.AckType.Individual};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ackType")
    public void txnTransactionRedeliverNullDispatcher(CommandAck.AckType ackType) throws Exception {
        String topic = "tnx/ns1/txnTransactionRedeliverNullDispatcher";
        String subName = "test";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("test").enableBatchIndexAcknowledgment(true).acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
        try {
            Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
            Producer producer = this.pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
            try {
                int messageCnt = 10;
                for (int i = 0; i < messageCnt; ++i) {
                    producer.send((Object)("Hello Txn - " + i).getBytes(StandardCharsets.UTF_8));
                }
                Transaction txn = this.getTxn();
                if (ackType == CommandAck.AckType.Individual) {
                    consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn);
                } else {
                    consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), txn);
                }
                topic = TopicName.get((String)topic).toString();
                boolean exist = false;
                for (int i = 0; i < this.getPulsarServiceList().size(); ++i) {
                    Optional topicOptional;
                    Field field = BrokerService.class.getDeclaredField("topics");
                    field.setAccessible(true);
                    ConcurrentOpenHashMap topics = (ConcurrentOpenHashMap)field.get(this.getPulsarServiceList().get(i).getBrokerService());
                    CompletableFuture topicFuture = (CompletableFuture)topics.get((Object)topic);
                    if (topicFuture == null || !(topicOptional = (Optional)topicFuture.get()).isPresent()) continue;
                    PersistentSubscription persistentSubscription = (PersistentSubscription)((Topic)topicOptional.get()).getSubscription("test");
                    field = persistentSubscription.getClass().getDeclaredField("dispatcher");
                    field.setAccessible(true);
                    field.set(persistentSubscription, null);
                    exist = true;
                }
                txn.abort().get();
                Assert.assertTrue((boolean)exist);
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void oneTransactionOneTopicWithMultiSubTest() throws Exception {
        String topic = "tnx/ns1/oneTransactionOneTopicWithMultiSubTest";
        String subName1 = "test1";
        String subName2 = "test2";
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("test1").acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
        try {
            Awaitility.await().until(() -> ((Consumer)consumer1).isConnected());
            Consumer consumer2 = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("test2").acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
            try {
                Awaitility.await().until(() -> ((Consumer)consumer2).isConnected());
                Producer producer = this.pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
                try {
                    MessageId messageId = producer.send((Object)"Hello Pulsar".getBytes(StandardCharsets.UTF_8));
                    TransactionImpl txn = (TransactionImpl)this.getTxn();
                    consumer1.acknowledgeAsync(messageId, (Transaction)txn).get();
                    consumer2.acknowledgeAsync(messageId, (Transaction)txn).get();
                    boolean flag = false;
                    for (int i = 0; i < this.getPulsarServiceList().size(); ++i) {
                        TransactionMetadataStoreService transactionMetadataStoreService = this.getPulsarServiceList().get(i).getTransactionMetadataStoreService();
                        if (!transactionMetadataStoreService.getStores().containsKey(TransactionCoordinatorID.get((long)txn.getTxnIdMostBits()))) continue;
                        List list = ((TxnMeta)transactionMetadataStoreService.getTxnMeta(new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())).get()).ackedPartitions();
                        flag = true;
                        Assert.assertEquals((int)list.size(), (int)2);
                        if (((TransactionSubscription)list.get(0)).getSubscription().equals("test1")) {
                            Assert.assertEquals((String)((TransactionSubscription)list.get(1)).getSubscription(), (String)"test2");
                            continue;
                        }
                        Assert.assertEquals((String)((TransactionSubscription)list.get(0)).getSubscription(), (String)"test2");
                        Assert.assertEquals((String)((TransactionSubscription)list.get(1)).getSubscription(), (String)"test1");
                    }
                    Assert.assertTrue((boolean)flag);
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer2).get(0) != null) {
                    consumer2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer1).get(0) != null) {
                consumer1.close();
            }
        }
    }
}

