/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class TransactionLowWaterMarkTest
extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionLowWaterMarkTest.class);
    private static final String TENANT = "tnx";
    private static final String NAMESPACE1 = "tnx/ns1";
    private static final String TOPIC = "tnx/ns1/test-topic";

    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        this.setBrokerCount(1);
        this.internalSetup();
        String[] brokerServiceUrlArr = this.getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
        this.admin.tenants().createTenant(TENANT, (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace(NAMESPACE1);
        this.admin.topics().createNonPartitionedTopic(TOPIC);
        this.admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
        this.admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
        }
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.getPulsarServiceList().get(0).getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).enableTransaction(true).build();
        Map stores = this.getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
        Awaitility.await().until(() -> {
            if (stores.size() == 16) {
                for (TransactionCoordinatorID transactionCoordinatorID : stores.keySet()) {
                    if (((MLTransactionMetadataStore)stores.get(transactionCoordinatorID)).getState() == TransactionMetadataStoreState.State.Ready) continue;
                    return false;
                }
                return true;
            }
            return false;
        });
    }

    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testTransactionBufferLowWaterMark() throws Exception {
        Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Producer producer = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{TOPIC}).subscriptionName("test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Failover).subscribe();
        String TEST1 = "test1";
        String TEST2 = "test2";
        String TEST3 = "test3";
        producer.newMessage(txn).value((Object)"test1".getBytes()).send();
        txn.commit().get();
        Message message = consumer.receive(2, TimeUnit.SECONDS);
        Assert.assertEquals((String)new String(message.getData()), (String)"test1");
        message = consumer.receive(2, TimeUnit.SECONDS);
        Assert.assertNull((Object)message);
        Field field = TransactionImpl.class.getDeclaredField("state");
        field.setAccessible(true);
        field.set(txn, TransactionImpl.State.OPEN);
        producer.newMessage(txn).value((Object)"test2".getBytes()).send();
        message = consumer.receive(2, TimeUnit.SECONDS);
        Assert.assertNull((Object)message);
        PartitionedTopicMetadata partitionedTopicMetadata = (PartitionedTopicMetadata)((PulsarClientImpl)this.pulsarClient).getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get();
        Transaction lowWaterMarkTxn = null;
        for (int i = 0; i < partitionedTopicMetadata.partitions && ((TransactionImpl)(lowWaterMarkTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get())).getTxnIdMostBits() != ((TransactionImpl)txn).getTxnIdMostBits(); ++i) {
        }
        if (lowWaterMarkTxn != null && ((TransactionImpl)lowWaterMarkTxn).getTxnIdMostBits() == ((TransactionImpl)txn).getTxnIdMostBits()) {
            producer.newMessage(lowWaterMarkTxn).value((Object)"test3".getBytes()).send();
            message = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)message);
            lowWaterMarkTxn.commit().get();
            message = consumer.receive();
            Assert.assertEquals((String)new String(message.getData()), (String)"test3");
        } else {
            Assert.fail();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPendingAckLowWaterMark() throws Exception {
        String subName = "test";
        Transaction txn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Producer producer = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{TOPIC}).subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Failover).subscribe();
            try {
                String TEST1 = "test1";
                String TEST2 = "test2";
                String TEST3 = "test3";
                producer.send((Object)"test1".getBytes());
                producer.send((Object)"test2".getBytes());
                producer.send((Object)"test3".getBytes());
                Message message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertEquals((String)new String(message.getData()), (String)"test1");
                consumer.acknowledgeAsync(message.getMessageId(), txn).get();
                LinkedMap individualAckOfTransaction = null;
                for (int i = 0; i < this.getPulsarServiceList().size(); ++i) {
                    Optional topic;
                    Field field = BrokerService.class.getDeclaredField("topics");
                    field.setAccessible(true);
                    ConcurrentOpenHashMap topics = (ConcurrentOpenHashMap)field.get(this.getPulsarServiceList().get(i).getBrokerService());
                    CompletableFuture completableFuture = (CompletableFuture)topics.get((Object)"persistent://tnx/ns1/test-topic");
                    if (completableFuture == null || !(topic = (Optional)completableFuture.get()).isPresent()) continue;
                    PersistentSubscription persistentSubscription = (PersistentSubscription)((Topic)topic.get()).getSubscription(subName);
                    field = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
                    field.setAccessible(true);
                    PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl)field.get(persistentSubscription);
                    field = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
                    field.setAccessible(true);
                    individualAckOfTransaction = (LinkedMap)field.get(pendingAckHandle);
                }
                Assert.assertTrue((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)txn).getTxnIdMostBits(), ((TransactionImpl)txn).getTxnIdLeastBits())));
                txn.commit().get();
                Field field = TransactionImpl.class.getDeclaredField("state");
                field.setAccessible(true);
                field.set(txn, TransactionImpl.State.OPEN);
                Assert.assertFalse((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)txn).getTxnIdMostBits(), ((TransactionImpl)txn).getTxnIdLeastBits())));
                message = consumer.receive();
                Assert.assertEquals((String)new String(message.getData()), (String)"test2");
                consumer.acknowledgeAsync(message.getMessageId(), txn).get();
                Assert.assertTrue((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)txn).getTxnIdMostBits(), ((TransactionImpl)txn).getTxnIdLeastBits())));
                PartitionedTopicMetadata partitionedTopicMetadata = (PartitionedTopicMetadata)((PulsarClientImpl)this.pulsarClient).getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get();
                Transaction lowWaterMarkTxn = null;
                for (int i = 0; i < partitionedTopicMetadata.partitions && ((TransactionImpl)(lowWaterMarkTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get())).getTxnIdMostBits() != ((TransactionImpl)txn).getTxnIdMostBits(); ++i) {
                }
                if (lowWaterMarkTxn != null && ((TransactionImpl)lowWaterMarkTxn).getTxnIdMostBits() == ((TransactionImpl)txn).getTxnIdMostBits()) {
                    producer.newMessage(lowWaterMarkTxn).value((Object)"test3".getBytes()).send();
                    message = consumer.receive(2, TimeUnit.SECONDS);
                    Assert.assertEquals((String)new String(message.getData()), (String)"test3");
                    consumer.acknowledgeAsync(message.getMessageId(), lowWaterMarkTxn).get();
                    Assert.assertTrue((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)txn).getTxnIdMostBits(), ((TransactionImpl)txn).getTxnIdLeastBits())));
                    Assert.assertTrue((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)lowWaterMarkTxn).getTxnIdMostBits(), ((TransactionImpl)lowWaterMarkTxn).getTxnIdLeastBits())));
                    lowWaterMarkTxn.commit().get();
                    Assert.assertFalse((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)txn).getTxnIdMostBits(), ((TransactionImpl)txn).getTxnIdLeastBits())));
                    Assert.assertFalse((boolean)individualAckOfTransaction.containsKey((Object)new TxnID(((TransactionImpl)lowWaterMarkTxn).getTxnIdMostBits(), ((TransactionImpl)lowWaterMarkTxn).getTxnIdLeastBits())));
                } else {
                    Assert.fail();
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

