/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction;

import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class TransactionProduceTest
extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionProduceTest.class);
    private static final int TOPIC_PARTITION = 3;
    private static final String TENANT = "tnx";
    private static final String NAMESPACE1 = "tnx/ns1";
    private static final String PRODUCE_COMMIT_TOPIC = "tnx/ns1/produce-commit";
    private static final String PRODUCE_ABORT_TOPIC = "tnx/ns1/produce-abort";
    private static final String ACK_COMMIT_TOPIC = "tnx/ns1/ack-commit";
    private static final String ACK_ABORT_TOPIC = "tnx/ns1/ack-abort";

    @BeforeMethod
    protected void setup() throws Exception {
        this.internalSetup();
        String[] brokerServiceUrlArr = this.getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
        this.admin.clusters().createCluster("test", new ClusterData("http://localhost:" + webServicePort));
        this.admin.tenants().createTenant(TENANT, new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace(NAMESPACE1);
        this.admin.topics().createPartitionedTopic(PRODUCE_COMMIT_TOPIC, 3);
        this.admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, 3);
        this.admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, 3);
        this.admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, 3);
        this.admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
        this.admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
        }
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.getPulsarServiceList().get(0).getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).enableTransaction(true).build();
        Thread.sleep(3000L);
    }

    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void produceAndCommitTest() throws Exception {
        this.produceTest(true);
    }

    @Test
    public void produceAndAbortTest() throws Exception {
        this.produceTest(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void produceTest(boolean endAction) throws Exception {
        String topic = endAction ? PRODUCE_COMMIT_TOPIC : PRODUCE_ABORT_TOPIC;
        PulsarClient pulsarClient = this.pulsarClient;
        Transaction tnx = (Transaction)pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        long txnIdMostBits = ((TransactionImpl)tnx).getTxnIdMostBits();
        long txnIdLeastBits = ((TransactionImpl)tnx).getTxnIdLeastBits();
        Assert.assertTrue((txnIdMostBits > -1L ? 1 : 0) != 0);
        Assert.assertTrue((txnIdLeastBits > -1L ? 1 : 0) != 0);
        Producer outProducer = pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            List entries;
            ReadOnlyCursor originTopicCursor;
            int i;
            int messageCntPerPartition = 3;
            int messageCnt = 3 * messageCntPerPartition;
            String content = "Hello Txn - ";
            HashSet<String> messageSet = new HashSet<String>();
            ArrayList<CompletableFuture<MessageId>> futureList = new ArrayList<CompletableFuture<MessageId>>();
            for (i = 0; i < messageCnt; ++i) {
                String msg = content + i;
                messageSet.add(msg);
                CompletableFuture produceFuture = outProducer.newMessage(tnx).value((Object)msg.getBytes(StandardCharsets.UTF_8)).sendAsync();
                futureList.add(produceFuture);
            }
            this.checkMessageId(futureList, true);
            for (i = 0; i < 3; ++i) {
                originTopicCursor = this.getOriginTopicCursor(topic, i);
                Assert.assertNotNull((Object)originTopicCursor);
                log.info("entries count: {}", (Object)originTopicCursor.getNumberOfEntries());
                Assert.assertEquals((long)messageCntPerPartition, (long)originTopicCursor.getNumberOfEntries());
                entries = originTopicCursor.readEntries(messageCnt);
                for (int j = 0; j < messageCntPerPartition; ++j) {
                    MessageMetadata messageMetadata = Commands.parseMessageMetadata((ByteBuf)((Entry)entries.get(j)).getDataBuffer());
                    Assert.assertEquals((long)messageMetadata.getTxnidMostBits(), (long)txnIdMostBits);
                    Assert.assertEquals((long)messageMetadata.getTxnidLeastBits(), (long)txnIdLeastBits);
                    byte[] bytes = new byte[((Entry)entries.get(j)).getDataBuffer().readableBytes()];
                    ((Entry)entries.get(j)).getDataBuffer().readBytes(bytes);
                    System.out.println(new String(bytes));
                    Assert.assertTrue((boolean)messageSet.remove(new String(bytes)));
                }
                originTopicCursor.close();
            }
            if (endAction) {
                tnx.commit().get();
            } else {
                tnx.abort().get();
            }
            for (i = 0; i < 3; ++i) {
                originTopicCursor = this.getOriginTopicCursor(topic, i);
                entries = originTopicCursor.readEntries((int)originTopicCursor.getNumberOfEntries());
                Assert.assertEquals((int)(messageCntPerPartition + 1), (int)entries.size());
                MessageMetadata messageMetadata = Commands.parseMessageMetadata((ByteBuf)((Entry)entries.get(messageCntPerPartition)).getDataBuffer());
                if (endAction) {
                    Assert.assertEquals((int)21, (int)messageMetadata.getMarkerType());
                    continue;
                }
                Assert.assertEquals((int)22, (int)messageMetadata.getMarkerType());
            }
            Assert.assertEquals((int)0, (int)messageSet.size());
            log.info("produce and {} test finished.", (Object)(endAction ? "commit" : "abort"));
        }
        finally {
            if (Collections.singletonList(outProducer).get(0) != null) {
                outProducer.close();
            }
        }
    }

    private void checkMessageId(List<CompletableFuture<MessageId>> futureList, boolean isFinished) {
        futureList.forEach(messageIdFuture -> {
            try {
                MessageId messageId = (MessageId)messageIdFuture.get(1L, TimeUnit.SECONDS);
                if (isFinished) {
                    Assert.assertNotNull((Object)messageId);
                    log.info("Tnx finished success! messageId: {}", (Object)messageId);
                } else {
                    Assert.fail((String)"MessageId shouldn't be get before txn abort.");
                }
            }
            catch (Exception e) {
                if (!isFinished) {
                    if (e instanceof TimeoutException) {
                        log.info("This is a expected exception.");
                    } else {
                        log.error("This exception is not expected.", (Throwable)e);
                        Assert.fail((String)"This exception is not expected.");
                    }
                }
                log.error("Tnx commit failed!", (Throwable)e);
                Assert.fail((String)"Tnx commit failed!");
            }
        });
    }

    private ReadOnlyCursor getOriginTopicCursor(String topic, int partition) {
        try {
            if (partition >= 0) {
                topic = TopicName.get((String)topic).toString() + "-partition-" + partition;
            }
            return this.getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(TopicName.get((String)topic).getPersistenceNamingEncoding(), (Position)PositionImpl.earliest, new ManagedLedgerConfig());
        }
        catch (Exception e) {
            log.error("Failed to get origin topic readonly cursor.", (Throwable)e);
            Assert.fail((String)"Failed to get origin topic readonly cursor.");
            return null;
        }
    }

    @Test
    public void ackCommitTest() throws Exception {
        String subscriptionName = "ackCommitTest";
        Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        log.info("init transaction {}.", (Object)txn);
        Producer incomingProducer = this.pulsarClient.newProducer().topic(ACK_COMMIT_TOPIC).batchingMaxMessages(1).roundRobinRouterBatchingPartitionSwitchFrequency(1).create();
        int incomingMessageCnt = 10;
        for (int i = 0; i < incomingMessageCnt; ++i) {
            incomingProducer.newMessage().value((Object)"Hello Txn.".getBytes()).sendAsync();
        }
        log.info("prepare incoming messages finished.");
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{ACK_COMMIT_TOPIC}).subscriptionName("ackCommitTest").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Shared).subscribe();
        Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
        for (int i = 0; i < incomingMessageCnt; ++i) {
            Message message = consumer.receive();
            log.info("receive messageId: {}", (Object)message.getMessageId());
            consumer.acknowledgeAsync(message.getMessageId(), txn);
        }
        Thread.sleep(1000L);
        Assert.assertEquals((int)this.getPendingAckCount(ACK_COMMIT_TOPIC, "ackCommitTest"), (int)incomingMessageCnt);
        consumer.redeliverUnacknowledgedMessages();
        Message message = consumer.receive(2, TimeUnit.SECONDS);
        Assert.assertNull((Object)message);
        Assert.assertEquals((int)this.getPendingAckCount(ACK_COMMIT_TOPIC, "ackCommitTest"), (int)incomingMessageCnt);
        txn.commit().get();
        Thread.sleep(1000L);
        Assert.assertEquals((int)this.getPendingAckCount(ACK_COMMIT_TOPIC, "ackCommitTest"), (int)0);
        consumer.redeliverUnacknowledgedMessages();
        for (int i = 0; i < incomingMessageCnt; ++i) {
            message = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)message);
        }
        log.info("finish test ackCommitTest");
    }

    @Test
    public void ackAbortTest() throws Exception {
        String subscriptionName = "ackAbortTest";
        Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        log.info("init transaction {}.", (Object)txn);
        Producer incomingProducer = this.pulsarClient.newProducer().topic(ACK_ABORT_TOPIC).batchingMaxMessages(1).roundRobinRouterBatchingPartitionSwitchFrequency(1).create();
        int incomingMessageCnt = 10;
        for (int i = 0; i < incomingMessageCnt; ++i) {
            incomingProducer.newMessage().value((Object)"Hello Txn.".getBytes()).send();
        }
        log.info("prepare incoming messages finished.");
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{ACK_ABORT_TOPIC}).subscriptionName("ackAbortTest").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Shared).subscribe();
        Awaitility.await().until(() -> ((Consumer)consumer).isConnected());
        for (int i = 0; i < incomingMessageCnt; ++i) {
            Message message = consumer.receive();
            log.info("receive messageId: {}", (Object)message.getMessageId());
            consumer.acknowledgeAsync(message.getMessageId(), txn);
        }
        Thread.sleep(1000L);
        Assert.assertEquals((int)this.getPendingAckCount(ACK_ABORT_TOPIC, "ackAbortTest"), (int)incomingMessageCnt);
        consumer.redeliverUnacknowledgedMessages();
        Message message = consumer.receive(2, TimeUnit.SECONDS);
        Assert.assertNull((Object)message);
        Assert.assertEquals((int)this.getPendingAckCount(ACK_ABORT_TOPIC, "ackAbortTest"), (int)incomingMessageCnt);
        txn.abort().get();
        Thread.sleep(1000L);
        Assert.assertEquals((int)this.getPendingAckCount(ACK_ABORT_TOPIC, "ackAbortTest"), (int)0);
        consumer.redeliverUnacknowledgedMessages();
        for (int i = 0; i < incomingMessageCnt; ++i) {
            message = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)message);
            log.info("second receive messageId: {}", (Object)message.getMessageId());
        }
        log.info("finish test ackAbortTest");
    }

    private int getPendingAckCount(String topic, String subscriptionName) throws Exception {
        Class<PersistentSubscription> clazz = PersistentSubscription.class;
        int pendingAckCount = 0;
        for (PulsarService pulsarService : this.getPulsarServiceList()) {
            for (String key : pulsarService.getBrokerService().getTopics().keys()) {
                if (!key.contains(topic)) continue;
                Field field = clazz.getDeclaredField("pendingAckHandle");
                field.setAccessible(true);
                PersistentSubscription subscription = (PersistentSubscription)((Topic)((Optional)((CompletableFuture)pulsarService.getBrokerService().getTopics().get((Object)key)).get()).get()).getSubscription(subscriptionName);
                PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl)field.get(subscription);
                field = PendingAckHandleImpl.class.getDeclaredField("individualAckPositions");
                field.setAccessible(true);
                Map map = (Map)field.get(pendingAckHandle);
                if (map == null) continue;
                pendingAckCount += map.size();
            }
        }
        log.info("subscriptionName: {}, pendingAckCount: {}", (Object)subscriptionName, (Object)pendingAckCount);
        return pendingAckCount;
    }
}

