/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.matadata.AbortTxnMetadata;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class TopicTransactionBufferRecoverTest
extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TopicTransactionBufferRecoverTest.class);
    private static final String TENANT = "tnx";
    private static final String NAMESPACE1 = "tnx/ns1";
    private static final String RECOVER_COMMIT = "tnx/ns1/recover-commit";
    private static final String RECOVER_ABORT = "tnx/ns1/recover-abort";
    private static final String SUBSCRIPTION_NAME = "test-recover";
    private static final String TAKE_SNAPSHOT = "tnx/ns1/take-snapshot";
    private static final String ABORT_DELETE = "tnx/ns1/abort-delete";

    @BeforeMethod
    protected void setup() throws Exception {
        this.internalSetup();
        String[] brokerServiceUrlArr = this.getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
        this.admin.tenants().createTenant(TENANT, (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace(NAMESPACE1);
        this.admin.topics().createNonPartitionedTopic(RECOVER_COMMIT);
        this.admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
        this.admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);
        this.admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
        this.admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
        }
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.getPulsarServiceList().get(0).getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).enableTransaction(true).build();
        Thread.sleep(3000L);
    }

    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
            this.pulsarClient = null;
        }
        super.internalCleanup();
    }

    @DataProvider(name="testTopic")
    public Object[] testTopic() {
        return new Object[]{RECOVER_ABORT, RECOVER_COMMIT};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="testTopic")
    private void recoverTest(String testTopic) throws Exception {
        PulsarClient pulsarClient = this.pulsarClient;
        Transaction tnx1 = (Transaction)pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
        Transaction tnx2 = (Transaction)pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
        Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(new String[]{testTopic}).subscriptionName(SUBSCRIPTION_NAME).subscribe();
        try {
            Producer producer = pulsarClient.newProducer(Schema.STRING).topic(testTopic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
            try {
                int messageCnt = 10;
                String content = "Hello Txn - ";
                for (int i = 0; i < messageCnt; ++i) {
                    MessageId messageId;
                    String msg = content + i;
                    if (i % 2 == 0) {
                        messageId = producer.newMessage(tnx1).value((Object)msg).send();
                        log.info("Txn1 send message : {}, messageId : {}", (Object)msg, (Object)messageId);
                        continue;
                    }
                    messageId = producer.newMessage(tnx2).value((Object)msg).send();
                    log.info("Txn2 send message : {}, messageId : {}", (Object)msg, (Object)messageId);
                }
                Message message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                tnx1.commit();
                message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNotNull((Object)message);
                log.info("Txn1 commit receive message : {}, messageId : {}", message.getValue(), (Object)message.getMessageId());
                consumer.acknowledge(message);
                message = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)message);
                this.admin.topics().unload(RECOVER_COMMIT);
                Awaitility.await().until(() -> {
                    for (int i = 0; i < this.getPulsarServiceList().size(); ++i) {
                        Optional topic;
                        Field field = BrokerService.class.getDeclaredField("topics");
                        field.setAccessible(true);
                        ConcurrentOpenHashMap topics = (ConcurrentOpenHashMap)field.get(this.getPulsarServiceList().get(i).getBrokerService());
                        CompletableFuture completableFuture = (CompletableFuture)topics.get((Object)("persistent://" + testTopic));
                        if (completableFuture == null || !(topic = (Optional)completableFuture.get()).isPresent()) continue;
                        PersistentTopic persistentTopic = (PersistentTopic)topic.get();
                        field = PersistentTopic.class.getDeclaredField("transactionBuffer");
                        field.setAccessible(true);
                        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer)field.get(persistentTopic);
                        if (topicTransactionBuffer.checkIfReady()) {
                            return true;
                        }
                        return false;
                    }
                    return false;
                });
                if (testTopic.equals(RECOVER_COMMIT)) {
                    tnx2.commit().get();
                    for (int i = messageCnt; i > 1; --i) {
                        message = consumer.receive();
                        log.info("Txn2 commit receive message : {}, messageId : {}", message.getValue(), (Object)message.getMessageId());
                        consumer.acknowledge(message);
                    }
                    message = consumer.receive(2, TimeUnit.SECONDS);
                    Assert.assertNull((Object)message);
                } else {
                    tnx2.abort().get();
                    for (int i = messageCnt / 2; i > 1; --i) {
                        message = consumer.receive();
                        log.info("Txn2 commit receive message : {}, messageId : {}", message.getValue(), (Object)message.getMessageId());
                        consumer.acknowledge(message);
                    }
                    message = consumer.receive(2, TimeUnit.SECONDS);
                    Assert.assertNull((Object)message);
                }
                consumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    private void testTakeSnapshot() throws IOException, ExecutionException, InterruptedException {
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(TAKE_SNAPSHOT).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            Transaction tnx1 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
            Transaction tnx2 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
            Transaction tnx3 = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
            Transaction abortTxn = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
            ReaderBuilder readerBuilder = this.pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class)).startMessageId(MessageId.earliest).topic("tnx/ns1/__transaction_buffer_snapshot");
            Reader reader = readerBuilder.create();
            MessageId messageId1 = producer.newMessage(tnx1).value((Object)"test").send();
            tnx1.commit().get();
            TransactionBufferSnapshot transactionBufferSnapshot = (TransactionBufferSnapshot)reader.readNext().getValue();
            Assert.assertEquals((long)transactionBufferSnapshot.getMaxReadPositionEntryId(), (long)(((MessageIdImpl)messageId1).getEntryId() + 1L));
            Assert.assertEquals((long)transactionBufferSnapshot.getMaxReadPositionLedgerId(), (long)((MessageIdImpl)messageId1).getLedgerId());
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
            MessageId messageId2 = producer.newMessage(tnx2).value((Object)"test").send();
            tnx2.commit().get();
            MessageId messageId3 = producer.newMessage(tnx3).value((Object)"test").send();
            tnx3.commit().get();
            TransactionBufferSnapshot snapshot = (TransactionBufferSnapshot)reader.readNext().getValue();
            Assert.assertEquals((long)snapshot.getMaxReadPositionEntryId(), (long)(((MessageIdImpl)messageId3).getEntryId() + 1L));
            Assert.assertEquals((long)snapshot.getMaxReadPositionLedgerId(), (long)((MessageIdImpl)messageId3).getLedgerId());
            Assert.assertEquals((int)snapshot.getAborts().size(), (int)0);
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
            MessageId messageId4 = producer.newMessage(abortTxn).value((Object)"test").send();
            abortTxn.abort().get();
            transactionBufferSnapshot = (TransactionBufferSnapshot)reader.readNext().getValue();
            Assert.assertEquals((long)transactionBufferSnapshot.getMaxReadPositionEntryId(), (long)(((MessageIdImpl)messageId4).getEntryId() + 1L));
            Assert.assertEquals((long)transactionBufferSnapshot.getMaxReadPositionLedgerId(), (long)((MessageIdImpl)messageId4).getLedgerId());
            Assert.assertEquals((int)transactionBufferSnapshot.getAborts().size(), (int)1);
            Assert.assertEquals((long)((AbortTxnMetadata)transactionBufferSnapshot.getAborts().get(0)).getTxnIdLeastBits(), (long)((TransactionImpl)abortTxn).getTxnIdLeastBits());
            Assert.assertEquals((long)((AbortTxnMetadata)transactionBufferSnapshot.getAborts().get(0)).getTxnIdMostBits(), (long)((TransactionImpl)abortTxn).getTxnIdMostBits());
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
            reader.close();
            producer.close();
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    private void testTopicTransactionBufferDeleteAbort() throws Exception {
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(ABORT_DELETE).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{ABORT_DELETE}).subscriptionName(SUBSCRIPTION_NAME).subscribe();
            try {
                Transaction tnx = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                String value = "Hello Pulsar!";
                MessageId messageId1 = producer.newMessage(tnx).value((Object)value).send();
                tnx.abort().get();
                this.admin.topics().unload(ABORT_DELETE);
                tnx = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                value = "Hello";
                producer.newMessage(tnx).value((Object)value).send();
                tnx.commit().get();
                Message message = consumer.receive(2, TimeUnit.SECONDS);
                System.out.println("consumer receive message" + message.getMessageId());
                Assert.assertNotNull((Object)message.getValue(), (String)value);
                consumer.acknowledge(message);
                tnx = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                MessageId messageId2 = producer.newMessage(tnx).value((Object)value).send();
                tnx.abort().get();
                Assert.assertTrue((((MessageIdImpl)messageId2).getLedgerId() != ((MessageIdImpl)messageId1).getLedgerId() ? 1 : 0) != 0);
                boolean exist = false;
                for (int i = 0; i < this.getPulsarServiceList().size(); ++i) {
                    Optional topic;
                    Field field = BrokerService.class.getDeclaredField("topics");
                    field.setAccessible(true);
                    ConcurrentOpenHashMap topics = (ConcurrentOpenHashMap)field.get(this.getPulsarServiceList().get(i).getBrokerService());
                    CompletableFuture completableFuture = (CompletableFuture)topics.get((Object)"persistent://tnx/ns1/abort-delete");
                    if (completableFuture == null || !(topic = (Optional)completableFuture.get()).isPresent()) continue;
                    PersistentTopic persistentTopic = (PersistentTopic)topic.get();
                    field = ManagedLedgerImpl.class.getDeclaredField("ledgers");
                    field.setAccessible(true);
                    NavigableMap ledgers = (NavigableMap)field.get(persistentTopic.getManagedLedger());
                    ledgers.remove(((MessageIdImpl)messageId1).getLedgerId());
                    tnx = (Transaction)this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                    producer.newMessage(tnx).value((Object)value).send();
                    tnx.commit().get();
                    field = PersistentTopic.class.getDeclaredField("transactionBuffer");
                    field.setAccessible(true);
                    TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer)field.get(persistentTopic);
                    field = TopicTransactionBuffer.class.getDeclaredField("aborts");
                    field.setAccessible(true);
                    LinkedMap linkedMap = (LinkedMap)field.get(topicTransactionBuffer);
                    Assert.assertEquals((int)linkedMap.size(), (int)1);
                    Assert.assertEquals((long)((PositionImpl)linkedMap.get(linkedMap.firstKey())).getLedgerId(), (long)((MessageIdImpl)message.getMessageId()).getLedgerId());
                    exist = true;
                }
                Assert.assertTrue((boolean)exist);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

