package org.apache.pulsar.broker.transaction.buffer;

import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.class */
public class TopicTransactionBufferTest extends TransactionTestBase {
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        setBrokerCount(1);
        setUpBase(1, 16, "persistent://tnx/ns1/test", 0);
        Map stores = getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
        Awaitility.await().until(() -> {
            if (stores.size() != 16) {
                return false;
            }
            Iterator it = stores.keySet().iterator();
            while (it.hasNext()) {
                if (((MLTransactionMetadataStore) stores.get((TransactionCoordinatorID) it.next())).getState() != TransactionMetadataStoreState.State.Ready) {
                    return false;
                }
            }
            return true;
        });
    }

    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testTransactionBufferAppendMarkerWriteFailState() throws Exception {
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        this.pulsarClient.newProducer().topic("persistent://tnx/ns1/testPendingAckManageLedgerWriteFailState").sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create().newMessage(transaction).value("test".getBytes()).send();
        FieldUtils.writeField(((PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get("persistent://tnx/ns1/testPendingAckManageLedgerWriteFailState").toString(), false).get()).get()).getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed, true);
        transaction.commit().get();
    }

    @Test
    public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception {
        String str = "persistent://tnx/ns1/test_" + UUID.randomUUID();
        BrokerService brokerService = (BrokerService) Mockito.spy(this.pulsarServiceList.get(0).getBrokerService());
        AtomicReference atomicReference = new AtomicReference();
        ((BrokerService) Mockito.doAnswer(invocationOnMock -> {
            String str2 = (String) invocationOnMock.getArgument(0);
            ManagedLedger managedLedger = (ManagedLedger) invocationOnMock.getArgument(1);
            BrokerService brokerService2 = (BrokerService) invocationOnMock.getArgument(2);
            if (!((Class) invocationOnMock.getArgument(3)).equals(PersistentTopic.class)) {
                return new NonPersistentTopic(str2, brokerService2);
            }
            PersistentTopic persistentTopic = (PersistentTopic) Mockito.spy(new PersistentTopic(str2, managedLedger, brokerService2));
            ((PersistentTopic) Mockito.doReturn(CompletableFuture.failedFuture(new ManagedLedgerException("This is an exception"))).when(persistentTopic)).checkDeduplicationStatus();
            atomicReference.set(persistentTopic);
            return persistentTopic;
        }).when(brokerService)).newTopic((String) Mockito.eq(str), (ManagedLedger) ArgumentMatchers.any(), (BrokerService) Mockito.eq(brokerService), (Class) Mockito.eq(PersistentTopic.class));
        brokerService.createPersistentTopic0(str, true, new CompletableFuture(), Collections.emptyMap());
        Awaitility.waitAtMost(1L, TimeUnit.MINUTES).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        TopicTransactionBuffer transactionBuffer = ((PersistentTopic) atomicReference.get()).getTransactionBuffer();
        Assert.assertTrue(transactionBuffer instanceof TopicTransactionBuffer);
        TopicTransactionBuffer topicTransactionBuffer = transactionBuffer;
        Assert.assertEquals(topicTransactionBuffer.getState(), TopicTransactionBufferState.State.Close);
    }

    @Test
    public void testCloseTransactionBufferWhenTimeout() throws Exception {
        String str = "persistent://tnx/ns1/test_" + UUID.randomUUID();
        PulsarService pulsarService = this.pulsarServiceList.get(0);
        BrokerService brokerService = (BrokerService) Mockito.spy(pulsarService.getBrokerService());
        AtomicReference atomicReference = new AtomicReference();
        pulsarService.getConfiguration().setTopicLoadTimeoutSeconds(10L);
        long millis = TimeUnit.SECONDS.toMillis(pulsarService.getConfiguration().getTopicLoadTimeoutSeconds() + 1);
        ((BrokerService) Mockito.doAnswer(invocationOnMock -> {
            Thread.sleep(millis);
            PersistentTopic persistentTopic = (PersistentTopic) invocationOnMock.callRealMethod();
            atomicReference.set(persistentTopic);
            return persistentTopic;
        }).when(brokerService)).newTopic((String) Mockito.eq(str), (ManagedLedger) ArgumentMatchers.any(), (BrokerService) Mockito.eq(brokerService), (Class) Mockito.eq(PersistentTopic.class));
        CompletableFuture topic = brokerService.getTopic(str, true);
        Awaitility.waitAtMost(20L, TimeUnit.SECONDS).pollInterval(Duration.ofSeconds(2L)).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        TopicTransactionBuffer transactionBuffer = ((PersistentTopic) atomicReference.get()).getTransactionBuffer();
        Assert.assertTrue(transactionBuffer instanceof TopicTransactionBuffer);
        TopicTransactionBuffer topicTransactionBuffer = transactionBuffer;
        Assert.assertEquals(topicTransactionBuffer.getState(), TopicTransactionBufferState.State.Close);
        Assert.assertTrue(topic.isCompletedExceptionally());
    }

    @Test
    public void testGetMaxPositionAfterTBReady() throws Exception {
        TransactionBuffer transactionBuffer = (TransactionBuffer) Mockito.spy(TransactionBuffer.class);
        Mockito.when(transactionBuffer.checkIfTBRecoverCompletely(ArgumentMatchers.anyBoolean())).thenReturn(CompletableFuture.completedFuture(null)).thenReturn(CompletableFuture.failedFuture(new Throwable("Mock fail"))).thenReturn(CompletableFuture.completedFuture(null));
        TransactionBufferProvider transactionBufferProvider = (TransactionBufferProvider) Mockito.spy(TransactionBufferProvider.class);
        ((TransactionBufferProvider) Mockito.doReturn(transactionBuffer).when(transactionBufferProvider)).newTransactionBuffer((Topic) ArgumentMatchers.any());
        TransactionBufferProvider transactionBufferProvider2 = getPulsarServiceList().get(0).getTransactionBufferProvider();
        ((PulsarService) Mockito.doReturn(transactionBufferProvider).when(getPulsarServiceList().get(0))).getTransactionBufferProvider();
        this.admin.topics().createNonPartitionedTopic("persistent://tnx/ns1/testGetMaxReadyPositionAfterTBReady");
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://tnx/ns1/testGetMaxReadyPositionAfterTBReady"}).subscriptionName("sub").subscribe();
        try {
            Producer create = this.pulsarClient.newProducer().topic("persistent://tnx/ns1/testGetMaxReadyPositionAfterTBReady").create();
            try {
                MessageIdImpl send = create.newMessage().send();
                create.newMessage().send();
                ((TransactionBuffer) Mockito.doReturn(new PositionImpl(send.getLedgerId(), send.getEntryId())).when(transactionBuffer)).getMaxReadPosition();
                try {
                    subscribe.getLastMessageIds();
                    AssertJUnit.fail();
                } catch (PulsarClientException e) {
                    Assert.assertTrue(e.getMessage().contains("Failed to recover Transaction Buffer."));
                }
                List lastMessageIds = subscribe.getLastMessageIds();
                AssertJUnit.assertEquals(lastMessageIds.size(), 1);
                TopicMessageIdImpl topicMessageIdImpl = (TopicMessageIdImpl) lastMessageIds.get(0);
                AssertJUnit.assertEquals(send.getLedgerId(), topicMessageIdImpl.getLedgerId());
                AssertJUnit.assertEquals(send.getEntryId(), topicMessageIdImpl.getEntryId());
                ((PulsarService) Mockito.doReturn(transactionBufferProvider2).when(getPulsarServiceList().get(0))).getTransactionBufferProvider();
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://tnx/ns1/testGetLastMessageIdsWithOngoingTransactions").create();
        try {
            Consumer<?> subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://tnx/ns1/testGetLastMessageIdsWithOngoingTransactions"}).subscriptionName("my-subscription").subscribe();
            MessageIdImpl messageIdImpl = null;
            for (int i = 0; i < 3; i++) {
                messageIdImpl = (MessageIdImpl) create.newMessage().send();
            }
            assertMessageId(subscribe, messageIdImpl, 0);
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.HOURS).build().get();
            Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.HOURS).build().get();
            create.newMessage(transaction).send();
            MessageIdImpl messageIdImpl2 = (MessageIdImpl) create.newMessage().send();
            create.newMessage(transaction2).send();
            MessageIdImpl messageIdImpl3 = (MessageIdImpl) create.newMessage().send();
            assertMessageId(subscribe, messageIdImpl, 0);
            transaction.commit().get(5L, TimeUnit.SECONDS);
            assertMessageId(subscribe, messageIdImpl2, 0);
            transaction2.abort().get(5L, TimeUnit.SECONDS);
            assertMessageId(subscribe, messageIdImpl3, 2);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testGetLastMessageIdsWithOpenTransactionAtLedgerHead() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://tnx/ns1/testGetLastMessageIdsWithOpenTransactionAtLedgerHead").create();
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://tnx/ns1/testGetLastMessageIdsWithOpenTransactionAtLedgerHead"}).subscriptionName("my-subscription").subscribe();
            for (int i = 0; i < 3; i++) {
                System.out.println("expectedLastMessageID: " + create.newMessage().value(String.valueOf(i).getBytes()).send());
            }
            triggerLedgerSwitch("persistent://tnx/ns1/testGetLastMessageIdsWithOpenTransactionAtLedgerHead");
            create.newMessage((Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.HOURS).build().get()).send();
            Assert.assertTrue(this.pulsarClient.newReader().topic("persistent://tnx/ns1/testGetLastMessageIdsWithOpenTransactionAtLedgerHead").startMessageId(MessageId.earliest).create().hasMessageAvailable());
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    private void triggerLedgerSwitch(String str) throws Exception {
        this.admin.topics().unload(str);
        Awaitility.await().until(() -> {
            CompletableFuture topic = getPulsarServiceList().get(0).getBrokerService().getTopic(str, false);
            if (!topic.isDone() || topic.isCompletedExceptionally()) {
                return false;
            }
            Optional optional = (Optional) topic.join();
            if (optional.isPresent()) {
                return Boolean.valueOf(((PersistentTopic) optional.get()).getManagedLedger().getState() == ManagedLedgerImpl.State.LedgerOpened);
            }
            return false;
        });
    }

    private void assertMessageId(Consumer<?> consumer, MessageIdImpl messageIdImpl, int i) throws Exception {
        TopicMessageIdImpl topicMessageIdImpl = (TopicMessageIdImpl) consumer.getLastMessageIds().get(0);
        AssertJUnit.assertEquals(messageIdImpl.getEntryId(), topicMessageIdImpl.getEntryId() - i);
        AssertJUnit.assertEquals(messageIdImpl.getLedgerId(), topicMessageIdImpl.getLedgerId());
    }
}
