package org.apache.pulsar.broker.transaction;

import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/transaction/TransactionProduceTest.class */
public class TransactionProduceTest extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionProduceTest.class);
    private static final int TOPIC_PARTITION = 3;
    private static final String PRODUCE_COMMIT_TOPIC = "tnx/ns1/produce-commit";
    private static final String PRODUCE_ABORT_TOPIC = "tnx/ns1/produce-abort";
    private static final String ACK_COMMIT_TOPIC = "tnx/ns1/ack-commit";
    private static final String ACK_ABORT_TOPIC = "tnx/ns1/ack-abort";
    private static final int NUM_PARTITIONS = 16;

    @BeforeClass
    protected void setup() throws Exception {
        setUpBase(1, NUM_PARTITIONS, PRODUCE_COMMIT_TOPIC, 3);
        this.admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, 3);
        this.admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, 3);
        this.admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, 3);
    }

    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void produceAndCommitTest() throws Exception {
        produceTest(true);
    }

    @Test
    public void produceAndAbortTest() throws Exception {
        produceTest(false);
    }

    private void produceTest(boolean z) throws Exception {
        String str = z ? PRODUCE_COMMIT_TOPIC : PRODUCE_ABORT_TOPIC;
        PulsarClient pulsarClient = this.pulsarClient;
        TransactionImpl transactionImpl = (Transaction) pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        long txnIdMostBits = transactionImpl.getTxnIdMostBits();
        long txnIdLeastBits = transactionImpl.getTxnIdLeastBits();
        Assert.assertTrue(txnIdMostBits > -1);
        Assert.assertTrue(txnIdLeastBits > -1);
        Producer create = pulsarClient.newProducer().topic(str).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            int i = 3 * 3;
            HashSet hashSet = new HashSet();
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                String str2 = "Hello Txn - " + i2;
                hashSet.add(str2);
                arrayList.add(create.newMessage(transactionImpl).value(str2.getBytes(StandardCharsets.UTF_8)).sendAsync());
            }
            checkMessageId(arrayList, true);
            for (int i3 = 0; i3 < 3; i3++) {
                ReadOnlyCursor originTopicCursor = getOriginTopicCursor(str, i3);
                Assert.assertNotNull(originTopicCursor);
                log.info("entries count: {}", Long.valueOf(originTopicCursor.getNumberOfEntries()));
                Assert.assertEquals(3, originTopicCursor.getNumberOfEntries());
                List readEntries = originTopicCursor.readEntries(i);
                for (int i4 = 0; i4 < 3; i4++) {
                    MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(((Entry) readEntries.get(i4)).getDataBuffer());
                    Assert.assertEquals(parseMessageMetadata.getTxnidMostBits(), txnIdMostBits);
                    Assert.assertEquals(parseMessageMetadata.getTxnidLeastBits(), txnIdLeastBits);
                    byte[] bArr = new byte[((Entry) readEntries.get(i4)).getDataBuffer().readableBytes()];
                    ((Entry) readEntries.get(i4)).getDataBuffer().readBytes(bArr);
                    System.out.println(new String(bArr));
                    Assert.assertTrue(hashSet.remove(new String(bArr)));
                }
                originTopicCursor.close();
            }
            if (z) {
                transactionImpl.commit().get();
            } else {
                transactionImpl.abort().get();
            }
            for (int i5 = 0; i5 < 3; i5++) {
                ReadOnlyCursor originTopicCursor2 = getOriginTopicCursor(str, i5);
                List readEntries2 = originTopicCursor2.readEntries((int) originTopicCursor2.getNumberOfEntries());
                Assert.assertEquals(3 + 1, readEntries2.size());
                MessageMetadata parseMessageMetadata2 = Commands.parseMessageMetadata(((Entry) readEntries2.get(3)).getDataBuffer());
                if (z) {
                    Assert.assertEquals(21, parseMessageMetadata2.getMarkerType());
                } else {
                    Assert.assertEquals(22, parseMessageMetadata2.getMarkerType());
                }
            }
            Assert.assertEquals(0, hashSet.size());
            log.info("produce and {} test finished.", z ? "commit" : "abort");
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    private void checkMessageId(List<CompletableFuture<MessageId>> list, boolean z) {
        list.forEach(completableFuture -> {
            try {
                MessageId messageId = (MessageId) completableFuture.get(1L, TimeUnit.SECONDS);
                if (z) {
                    Assert.assertNotNull(messageId);
                    log.info("Tnx finished success! messageId: {}", messageId);
                } else {
                    Assert.fail("MessageId shouldn't be get before txn abort.");
                }
            } catch (Exception e) {
                if (z) {
                    log.error("Tnx commit failed!", e);
                    Assert.fail("Tnx commit failed!");
                } else if (e instanceof TimeoutException) {
                    log.info("This is a expected exception.");
                } else {
                    log.error("This exception is not expected.", e);
                    Assert.fail("This exception is not expected.");
                }
            }
        });
    }

    private ReadOnlyCursor getOriginTopicCursor(String str, int i) {
        if (i >= 0) {
            try {
                str = TopicName.get(str).toString() + "-partition-" + i;
            } catch (Exception e) {
                log.error("Failed to get origin topic readonly cursor.", e);
                Assert.fail("Failed to get origin topic readonly cursor.");
                return null;
            }
        }
        return getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(TopicName.get(str).getPersistenceNamingEncoding(), PositionImpl.EARLIEST, new ManagedLedgerConfig());
    }

    @Test
    public void ackCommitTest() throws Exception {
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        log.info("init transaction {}.", transaction);
        Producer create = this.pulsarClient.newProducer().topic(ACK_COMMIT_TOPIC).batchingMaxMessages(1).roundRobinRouterBatchingPartitionSwitchFrequency(1).create();
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            create.newMessage().value("Hello Txn.".getBytes()).sendAsync();
        }
        log.info("prepare incoming messages finished.");
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{ACK_COMMIT_TOPIC}).subscriptionName("ackCommitTest").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Shared).subscribe();
        ConditionFactory await = Awaitility.await();
        Objects.requireNonNull(subscribe);
        await.until(subscribe::isConnected);
        for (int i3 = 0; i3 < 10; i3++) {
            Message receive = subscribe.receive();
            log.info("receive messageId: {}", receive.getMessageId());
            subscribe.acknowledgeAsync(receive.getMessageId(), transaction);
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, "ackCommitTest"), i);
        });
        subscribe.redeliverUnacknowledgedMessages();
        Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
        Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, "ackCommitTest"), 10);
        transaction.commit().get();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, "ackCommitTest"), 0);
        });
        subscribe.redeliverUnacknowledgedMessages();
        for (int i4 = 0; i4 < 10; i4++) {
            Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
        }
        log.info("finish test ackCommitTest");
    }

    @Test
    public void ackAbortTest() throws Exception {
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
        log.info("init transaction {}.", transaction);
        Producer create = this.pulsarClient.newProducer().topic(ACK_ABORT_TOPIC).batchingMaxMessages(1).roundRobinRouterBatchingPartitionSwitchFrequency(1).create();
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            create.newMessage().value("Hello Txn.".getBytes()).send();
        }
        log.info("prepare incoming messages finished.");
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{ACK_ABORT_TOPIC}).subscriptionName("ackAbortTest").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Shared).subscribe();
        ConditionFactory await = Awaitility.await();
        Objects.requireNonNull(subscribe);
        await.until(subscribe::isConnected);
        for (int i3 = 0; i3 < 10; i3++) {
            Message receive = subscribe.receive();
            log.info("receive messageId: {}", receive.getMessageId());
            subscribe.acknowledgeAsync(receive.getMessageId(), transaction);
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, "ackAbortTest"), i);
        });
        subscribe.redeliverUnacknowledgedMessages();
        Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
        Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, "ackAbortTest"), 10);
        transaction.abort().get();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, "ackAbortTest"), 0);
        });
        subscribe.redeliverUnacknowledgedMessages();
        for (int i4 = 0; i4 < 10; i4++) {
            Message receive2 = subscribe.receive(2, TimeUnit.SECONDS);
            Assert.assertNotNull(receive2);
            log.info("second receive messageId: {}", receive2.getMessageId());
        }
        log.info("finish test ackAbortTest");
    }

    private int getPendingAckCount(String str, String str2) throws Exception {
        int i = 0;
        for (PulsarService pulsarService : getPulsarServiceList()) {
            for (String str3 : pulsarService.getBrokerService().getTopics().keys()) {
                if (str3.contains(str)) {
                    Field declaredField = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
                    declaredField.setAccessible(true);
                    PendingAckHandleImpl pendingAckHandleImpl = (PendingAckHandleImpl) declaredField.get(((Topic) ((Optional) ((CompletableFuture) pulsarService.getBrokerService().getTopics().get(str3)).get()).get()).getSubscription(str2));
                    Field declaredField2 = PendingAckHandleImpl.class.getDeclaredField("individualAckPositions");
                    declaredField2.setAccessible(true);
                    Map map = (Map) declaredField2.get(pendingAckHandleImpl);
                    if (map != null) {
                        i += map.size();
                    }
                }
            }
        }
        log.info("subscriptionName: {}, pendingAckCount: {}", str2, Integer.valueOf(i));
        return i;
    }

    @Test
    public void testCommitFailure() throws Exception {
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().build().get();
        Producer create = this.pulsarClient.newProducer().topic("tnx/ns1/test-commit-failure").create();
        try {
            create.newMessage(transaction).value(new byte[10485760]).sendAsync();
            try {
                transaction.commit().get();
                Assert.fail();
            } catch (ExecutionException e) {
                Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionHasOperationFailedException);
                Assert.assertEquals(transaction.getState(), Transaction.State.ABORTED);
            }
            try {
                getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(transaction.getTxnID()).getNow(null);
                Assert.fail();
            } catch (CompletionException e2) {
                Assert.assertTrue(e2.getCause() instanceof CoordinatorException.TransactionNotFoundException);
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }
}
