package org.apache.pulsar.broker.transaction;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SegmentStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.class */
public class SegmentAbortedTxnProcessorTest extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(SegmentAbortedTxnProcessorTest.class);
    private static final String PROCESSOR_TOPIC = "persistent://tnx/ns1/abortedTxnProcessor";
    private static final int SEGMENT_SIZE = 5;
    private PulsarService pulsarService = null;

    @BeforeClass
    protected void setup() throws Exception {
        setUpBase(1, 1, null, 0);
        this.pulsarService = getPulsarServiceList().get(0);
        this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
        this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 + PROCESSOR_TOPIC.length() + 15);
        this.admin.topics().createNonPartitionedTopic(PROCESSOR_TOPIC);
        Assert.assertTrue(getSnapshotAbortedTxnProcessor(PROCESSOR_TOPIC) instanceof SnapshotSegmentAbortedTxnProcessorImpl);
    }

    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testPutAbortedTxnIntoProcessor() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsarService.getBrokerService().getTopic(PROCESSOR_TOPIC, false).get()).get();
        SnapshotSegmentAbortedTxnProcessorImpl snapshotSegmentAbortedTxnProcessorImpl = new SnapshotSegmentAbortedTxnProcessorImpl(persistentTopic);
        for (int i = 0; i < 10; i++) {
            snapshotSegmentAbortedTxnProcessorImpl.putAbortedTxnAndPosition(new TxnID(0L, i), PositionFactory.create(0L, i));
        }
        for (int i2 = 10; i2 < 14; i2++) {
            snapshotSegmentAbortedTxnProcessorImpl.putAbortedTxnAndPosition(new TxnID(0L, i2), PositionFactory.create(0L, i2));
        }
        verifyAbortedTxnIDAndSegmentIndex(snapshotSegmentAbortedTxnProcessorImpl, 0, 14);
        SnapshotSegmentAbortedTxnProcessorImpl snapshotSegmentAbortedTxnProcessorImpl2 = new SnapshotSegmentAbortedTxnProcessorImpl(persistentTopic);
        Position create = PositionFactory.create(0L, 14L);
        waitTaskExecuteCompletely(snapshotSegmentAbortedTxnProcessorImpl);
        snapshotSegmentAbortedTxnProcessorImpl.takeAbortedTxnsSnapshot(create).get();
        snapshotSegmentAbortedTxnProcessorImpl2.recoverFromSnapshot().get();
        verifyAbortedTxnIDAndSegmentIndex(snapshotSegmentAbortedTxnProcessorImpl2, 0, 14);
        Field declaredField = ManagedLedgerImpl.class.getDeclaredField("ledgers");
        declaredField.setAccessible(true);
        NavigableMap navigableMap = (NavigableMap) declaredField.get(persistentTopic.getManagedLedger());
        navigableMap.forEach((l, ledgerInfo) -> {
            navigableMap.remove(l);
        });
        snapshotSegmentAbortedTxnProcessorImpl2.trimExpiredAbortedTxns();
        Awaitility.await().untilAsserted(() -> {
            verifyAbortedTxnIDAndSegmentIndex(snapshotSegmentAbortedTxnProcessorImpl2, 11, 4);
        });
        snapshotSegmentAbortedTxnProcessorImpl.closeAsync().get(5L, TimeUnit.SECONDS);
    }

    private void waitTaskExecuteCompletely(AbortedTxnProcessor abortedTxnProcessor) throws Exception {
        Field declaredField = SnapshotSegmentAbortedTxnProcessorImpl.class.getDeclaredField("persistentWorker");
        declaredField.setAccessible(true);
        SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker persistentWorker = (SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker) declaredField.get(abortedTxnProcessor);
        Field declaredField2 = SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker.class.getDeclaredField("taskQueue");
        declaredField2.setAccessible(true);
        Queue queue = (Queue) declaredField2.get(persistentWorker);
        Awaitility.await().untilAsserted(() -> {
            AssertJUnit.assertEquals(queue.size(), 0);
        });
    }

    private void verifyAbortedTxnIDAndSegmentIndex(AbortedTxnProcessor abortedTxnProcessor, int i, int i2) throws Exception {
        for (int i3 = i; i3 < i2; i3++) {
            Assert.assertTrue(abortedTxnProcessor.checkAbortedTransaction(new TxnID(0L, i3)));
        }
        Field declaredField = SnapshotSegmentAbortedTxnProcessorImpl.class.getDeclaredField("unsealedTxnIds");
        Field declaredField2 = SnapshotSegmentAbortedTxnProcessorImpl.class.getDeclaredField("segmentIndex");
        declaredField.setAccessible(true);
        declaredField2.setAccessible(true);
        LinkedList linkedList = (LinkedList) declaredField.get(abortedTxnProcessor);
        LinkedMap linkedMap = (LinkedMap) declaredField2.get(abortedTxnProcessor);
        AssertJUnit.assertEquals(linkedList.size(), i2 % 5);
        AssertJUnit.assertEquals(linkedMap.size(), i2 / 5);
    }

    @Test
    public void testFuturesCanCompleteWhenItIsCanceled() throws Exception {
        SnapshotSegmentAbortedTxnProcessorImpl snapshotSegmentAbortedTxnProcessorImpl = new SnapshotSegmentAbortedTxnProcessorImpl((PersistentTopic) ((Optional) this.pulsarService.getBrokerService().getTopic(PROCESSOR_TOPIC, false).get()).get());
        Field declaredField = SnapshotSegmentAbortedTxnProcessorImpl.class.getDeclaredField("persistentWorker");
        declaredField.setAccessible(true);
        SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker persistentWorker = (SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker) declaredField.get(snapshotSegmentAbortedTxnProcessorImpl);
        Field declaredField2 = SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker.class.getDeclaredField("taskQueue");
        declaredField2.setAccessible(true);
        ((Queue) declaredField2.get(persistentWorker)).add(new MutablePair(SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker.OperationType.WriteSegment, new MutablePair(new CompletableFuture(), CompletableFuture::new)));
        try {
            try {
                snapshotSegmentAbortedTxnProcessorImpl.takeAbortedTxnsSnapshot(PositionFactory.create(1L, 10L)).get(2L, TimeUnit.SECONDS);
                Assert.fail("The update index operation should fail.");
                snapshotSegmentAbortedTxnProcessorImpl.closeAsync().get(5L, TimeUnit.SECONDS);
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException);
                snapshotSegmentAbortedTxnProcessorImpl.closeAsync().get(5L, TimeUnit.SECONDS);
            }
        } catch (Throwable th) {
            snapshotSegmentAbortedTxnProcessorImpl.closeAsync().get(5L, TimeUnit.SECONDS);
            throw th;
        }
    }

    @Test
    public void testClearSnapshotSegments() throws Exception {
        SnapshotSegmentAbortedTxnProcessorImpl snapshotSegmentAbortedTxnProcessorImpl = new SnapshotSegmentAbortedTxnProcessorImpl((PersistentTopic) ((Optional) this.pulsarService.getBrokerService().getTopic(PROCESSOR_TOPIC, false).get()).get());
        for (int i = 0; i < 10; i++) {
            snapshotSegmentAbortedTxnProcessorImpl.putAbortedTxnAndPosition(new TxnID(0L, i), PositionFactory.create(0L, i));
        }
        Awaitility.await().untilAsserted(() -> {
            verifySnapshotSegmentsSize(PROCESSOR_TOPIC, 2);
        });
        Field declaredField = SnapshotSegmentAbortedTxnProcessorImpl.class.getDeclaredField("persistentWorker");
        declaredField.setAccessible(true);
        SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker persistentWorker = (SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker) declaredField.get(snapshotSegmentAbortedTxnProcessorImpl);
        Field declaredField2 = SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker.class.getDeclaredField("snapshotIndexWriter");
        declaredField2.setAccessible(true);
        SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter referenceCountedWriter = (SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter) declaredField2.get(persistentWorker);
        referenceCountedWriter.release();
        ((SystemTopicClient.Writer) referenceCountedWriter.getFuture().get()).close();
        for (int i2 = 0; i2 < 5; i2++) {
            snapshotSegmentAbortedTxnProcessorImpl.putAbortedTxnAndPosition(new TxnID(0L, i2), PositionFactory.create(0L, i2));
        }
        Awaitility.await().untilAsserted(() -> {
            verifySnapshotSegmentsSize(PROCESSOR_TOPIC, 3);
        });
        try {
            snapshotSegmentAbortedTxnProcessorImpl.clearAbortedTxnSnapshot().get();
            Assert.fail();
        } catch (Exception e) {
        }
        TopicName systemTopicName = NamespaceEventsSystemTopicFactory.getSystemTopicName(TopicName.get(PROCESSOR_TOPIC).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
        TopicName systemTopicName2 = NamespaceEventsSystemTopicFactory.getSystemTopicName(TopicName.get(PROCESSOR_TOPIC).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
        doCompaction(systemTopicName);
        doCompaction(systemTopicName2);
        verifySnapshotSegmentsSize(PROCESSOR_TOPIC, 0);
        verifySnapshotSegmentsIndexSize(PROCESSOR_TOPIC, 1);
        snapshotSegmentAbortedTxnProcessorImpl.closeAsync().get(5L, TimeUnit.SECONDS);
    }

    @Test
    public void testTxnSegmentStats() throws Exception {
        String str = "persistent://" + "tnx/testTxnSegmentStats" + "/testTxnSegmentStats";
        this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 + str.length() + 15);
        Transactions transactions = this.admin.transactions();
        this.admin.namespaces().createNamespace("tnx/testTxnSegmentStats");
        this.admin.topics().createNonPartitionedTopic(str);
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").subscribe();
            try {
                long currentTimeMillis = System.currentTimeMillis();
                Transaction transaction = null;
                for (int i = 0; i < 5; i++) {
                    transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.HOURS).build().get();
                    create.newMessage(transaction).send();
                    transaction.abort().get();
                }
                Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.HOURS).build().get();
                create.newMessage(transaction2).send();
                transaction2.abort().get();
                TransactionBufferStats transactionBufferStats = transactions.getTransactionBufferStats(str, false, false);
                AssertJUnit.assertNotNull(transactionBufferStats);
                AssertJUnit.assertNotNull(transactionBufferStats.segmentsStats);
                Assert.assertNull(transactionBufferStats.segmentsStats.segmentStats);
                AssertJUnit.assertEquals(transactionBufferStats.snapshotType, AbortedTxnProcessor.SnapshotType.Segment.toString());
                AssertJUnit.assertEquals(transactionBufferStats.segmentsStats.segmentsSize, 1L);
                AssertJUnit.assertEquals(transactionBufferStats.segmentsStats.unsealedAbortTxnIDSize, 1L);
                AssertJUnit.assertEquals(transactionBufferStats.segmentsStats.currentSegmentCapacity, 5L);
                AssertJUnit.assertEquals(transactionBufferStats.totalAbortedTransactions, 6L);
                Assert.assertTrue(transactionBufferStats.segmentsStats.lastTookSnapshotSegmentTimestamp >= currentTimeMillis);
                TransactionBufferStats transactionBufferStats2 = transactions.getTransactionBufferStats(str, false, true);
                AssertJUnit.assertNotNull(transactionBufferStats2);
                AssertJUnit.assertNotNull(transactionBufferStats2.segmentsStats.segmentStats);
                AssertJUnit.assertEquals(transactionBufferStats2.segmentsStats.segmentStats.size(), 1);
                AssertJUnit.assertEquals(((SegmentStats) transactionBufferStats2.segmentsStats.segmentStats.get(0)).lastTxnID, transaction.getTxnID().toString());
                for (int i2 = 0; i2 < 15; i2++) {
                    Transaction transaction3 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.HOURS).build().get();
                    create.newMessage(transaction3).send();
                    transaction3.abort().get();
                }
                TransactionBufferStats transactionBufferStats3 = transactions.getTransactionBufferStats(str, false, true);
                AssertJUnit.assertEquals(transactionBufferStats3.segmentsStats.segmentsSize, 4L);
                AssertJUnit.assertEquals(transactionBufferStats3.segmentsStats.unsealedAbortTxnIDSize, 1L);
                AssertJUnit.assertEquals(transactionBufferStats3.totalAbortedTransactions, 21L);
                this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 + PROCESSOR_TOPIC.length() + 15);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private void verifySnapshotSegmentsSize(String str, int i) throws Exception {
        SystemTopicClient.Reader reader = (SystemTopicClient.Reader) this.pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotSegmentService().createReader(TopicName.get(str)).get();
        int i2 = 0;
        while (reader.hasMoreEvents()) {
            if (str.equals(((TransactionBufferSnapshotSegment) ((Message) reader.readNextAsync().get(5L, TimeUnit.SECONDS)).getValue()).getTopicName())) {
                i2++;
            }
        }
        AssertJUnit.assertEquals(i2, i);
    }

    private void verifySnapshotSegmentsIndexSize(String str, int i) throws Exception {
        SystemTopicClient.Reader reader = (SystemTopicClient.Reader) this.pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotIndexService().createReader(TopicName.get(str)).get();
        int i2 = 0;
        while (reader.hasMoreEvents()) {
            Message message = (Message) reader.readNextAsync().get(5L, TimeUnit.SECONDS);
            if (str.equals(((TransactionBufferSnapshotIndexes) message.getValue()).getTopicName())) {
                i2++;
            }
            System.out.printf("message.getValue().getTopicName() :" + ((TransactionBufferSnapshotIndexes) message.getValue()).getTopicName(), new Object[0]);
        }
        AssertJUnit.assertEquals(i2, i);
    }

    private void doCompaction(TopicName topicName) throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsarService.getBrokerService().getTopic(topicName.toString(), false).get()).get();
        Field declaredField = PersistentTopic.class.getDeclaredField("currentCompaction");
        declaredField.setAccessible(true);
        persistentTopic.triggerCompaction();
        CompletableFuture completableFuture = (CompletableFuture) declaredField.get(persistentTopic);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(completableFuture.isDone());
        });
    }

    @Test
    public void testSnapshotProcessorUpgrade() throws Exception {
        String str = "tnx/ns2";
        this.admin.namespaces().createNamespace("tnx/ns2");
        this.pulsarService = getPulsarServiceList().get(0);
        this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
        String str2 = "persistent://" + "tnx/ns2" + "/testSnapshotProcessorUpgrade";
        Producer create = this.pulsarClient.newProducer().topic(str2).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str2}).subscriptionName("test-sub").subscribe();
            try {
                Assert.assertTrue(getSnapshotAbortedTxnProcessor(str2) instanceof SingleSnapshotAbortedTxnProcessorImpl);
                for (int i = 0; i < 10; i++) {
                    create.send(("test-message-" + i).getBytes());
                }
                for (int i2 = 0; i2 < 10; i2++) {
                    Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                    create.newMessage(transaction).value(("test-txn-message-" + i2).getBytes()).sendAsync();
                    transaction.abort().get();
                }
                for (int i3 = 10; i3 < 20; i3++) {
                    create.send(("test-message-" + i3).getBytes());
                }
                for (int i4 = 0; i4 < 20; i4++) {
                    AssertJUnit.assertEquals("test-message-" + i4, new String(subscribe.receive(5, TimeUnit.SECONDS).getData()));
                }
                this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
                this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 + PROCESSOR_TOPIC.length() + 15);
                this.admin.topics().unload(str2);
                Assert.assertTrue(getSnapshotAbortedTxnProcessor(str2) instanceof SnapshotSegmentAbortedTxnProcessorImpl);
                Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                create.newMessage(transaction2).value("test-message-new".getBytes()).send();
                transaction2.abort().get();
                Awaitility.await().untilAsserted(() -> {
                    AssertJUnit.assertEquals(1L, this.admin.topics().getStats("persistent://" + str + "/__transaction_buffer_snapshot_segments").getMsgInCounter());
                });
                subscribe.close();
                subscribe = this.pulsarClient.newConsumer().topic(new String[]{str2}).subscriptionName("test-sub").subscribe();
                for (int i5 = 0; i5 < 20; i5++) {
                    AssertJUnit.assertEquals("test-message-" + i5, new String(subscribe.receive(5, TimeUnit.SECONDS).getData()));
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testSegmentedSnapshotWithoutCreatingOldSnapshotTopic() throws Exception {
        this.pulsarService = getPulsarServiceList().get(0);
        this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
        this.admin.namespaces().createNamespace("tnx/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic");
        String str = "persistent://" + "tnx/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic" + "/newTopic";
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        try {
            create.close();
            Assert.assertTrue(getSnapshotAbortedTxnProcessor(str) instanceof SnapshotSegmentAbortedTxnProcessorImpl);
            try {
                this.admin.topics().getStats("persistent://" + "tnx/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic" + "/__transaction_buffer_snapshot");
                Assert.fail("The __transaction_buffer_snapshot topic should not exist");
            } catch (PulsarAdminException e) {
                AssertJUnit.assertEquals(e.getStatusCode(), 404);
            }
            this.admin.namespaces().deleteNamespace("tnx/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic", true);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    private AbortedTxnProcessor getSnapshotAbortedTxnProcessor(String str) {
        return (AbortedTxnProcessor) WhiteboxImpl.getInternalState(getPersistentTopic(str).getTransactionBuffer(), "snapshotAbortedTxnProcessor");
    }

    private PersistentTopic getPersistentTopic(String str) {
        Iterator<PulsarService> it = getPulsarServiceList().iterator();
        while (it.hasNext()) {
            CompletableFuture topic = it.next().getBrokerService().getTopic(str, false);
            if (topic != null) {
                return (PersistentTopic) ((Optional) topic.join()).get();
            }
        }
        throw new NullPointerException("topic[" + str + "] not found");
    }
}
