package org.apache.pulsar.broker.service;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.class */
public class TransactionMarkerDeleteTest extends TransactionTestBase {
    private static final int TOPIC_PARTITION = 3;
    private static final String TOPIC_OUTPUT = "tnx/ns1/output";
    private static final int NUM_PARTITIONS = 16;

    @BeforeMethod
    protected void setup() throws Exception {
        setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, 3);
    }

    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testMarkerDeleteTimes() throws Exception {
        ManagedLedgerImpl managedLedgerImpl = (ManagedLedgerImpl) Mockito.spy(getPulsarServiceList().get(0).getDefaultManagedLedgerFactory().open("test"));
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        BrokerService brokerService = (BrokerService) Mockito.mock(BrokerService.class);
        PulsarService pulsarService = (PulsarService) Mockito.mock(PulsarService.class);
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.mock(ServiceConfiguration.class);
        ((PersistentTopic) Mockito.doReturn(brokerService).when(persistentTopic)).getBrokerService();
        ((BrokerService) Mockito.doReturn(pulsarService).when(brokerService)).getPulsar();
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(pulsarService)).getConfig();
        ((ServiceConfiguration) Mockito.doReturn(false).when(serviceConfiguration)).isTransactionCoordinatorEnabled();
        ((PersistentTopic) Mockito.doReturn(managedLedgerImpl).when(persistentTopic)).getManagedLedger();
        ((PersistentSubscription) BrokerTestUtil.spyWithClassAndConstructorArgs(PersistentSubscription.class, persistentTopic, "test", managedLedgerImpl.openCursor("test"), false)).acknowledgeMessage(Collections.singletonList(managedLedgerImpl.addEntry("test".getBytes())), CommandAck.AckType.Individual, Collections.emptyMap());
        ((ManagedLedgerImpl) Mockito.verify(managedLedgerImpl, Mockito.times(0))).asyncReadEntry((Position) ArgumentMatchers.any(), (AsyncCallbacks.ReadEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
    }

    @Test
    public void testMarkerDelete() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/testMarkerDelete"}).subscriptionName("testMarkerDelete").isAckReceiptEnabled(true).subscriptionType(SubscriptionType.Shared).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer().sendTimeout(0, TimeUnit.SECONDS).topic("tnx/ns1/testMarkerDelete").create();
            Transaction txn = getTxn();
            Transaction txn2 = getTxn();
            Transaction txn3 = getTxn();
            Transaction txn4 = getTxn();
            MessageIdImpl send = create.newMessage(txn).send();
            MessageIdImpl send2 = create.newMessage(txn2).send();
            Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
            txn.commit().get();
            subscribe.acknowledgeAsync(subscribe.receive()).get();
            Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
            Assert.assertEquals(((ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats("tnx/ns1/testMarkerDelete").cursors.get("testMarkerDelete")).markDeletePosition, PositionFactory.create(send.getLedgerId(), send.getEntryId()).toString());
            MessageIdImpl send3 = create.newMessage(txn3).send();
            txn2.commit().get();
            subscribe.acknowledgeAsync(subscribe.receive()).get();
            Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
            Assert.assertEquals(((ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats("tnx/ns1/testMarkerDelete").cursors.get("testMarkerDelete")).markDeletePosition, PositionFactory.create(send2.getLedgerId(), send2.getEntryId() + 1).toString());
            MessageIdImpl send4 = create.newMessage(txn4).send();
            txn3.commit().get();
            subscribe.acknowledgeAsync(subscribe.receive()).get();
            Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
            Assert.assertEquals(((ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats("tnx/ns1/testMarkerDelete").cursors.get("testMarkerDelete")).markDeletePosition, PositionFactory.create(send3.getLedgerId(), send3.getEntryId() + 1).toString());
            txn4.abort().get();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(((ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats("tnx/ns1/testMarkerDelete").cursors.get("testMarkerDelete")).markDeletePosition, PositionFactory.create(send4.getLedgerId(), send4.getEntryId() + 2).toString());
            });
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th;
        }
    }

    private Transaction getTxn() throws Exception {
        return (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
    }
}
