package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Sets;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.class */
public class TransactionMarkerDeleteTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setTransactionCoordinatorEnabled(true);
        super.baseSetup();
        this.admin.tenants().createTenant("public", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("public/default");
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testMarkerDeleteTimes() throws Exception {
        ManagedLedgerImpl managedLedgerImpl = (ManagedLedgerImpl) Mockito.spy(this.pulsar.getManagedLedgerFactory().open("test"));
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        BrokerService brokerService = (BrokerService) Mockito.mock(BrokerService.class);
        PulsarService pulsarService = (PulsarService) Mockito.mock(PulsarService.class);
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.mock(ServiceConfiguration.class);
        ((PersistentTopic) Mockito.doReturn(brokerService).when(persistentTopic)).getBrokerService();
        ((BrokerService) Mockito.doReturn(pulsarService).when(brokerService)).getPulsar();
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(pulsarService)).getConfig();
        ((ServiceConfiguration) Mockito.doReturn(false).when(serviceConfiguration)).isTransactionCoordinatorEnabled();
        ((PersistentTopic) Mockito.doReturn(managedLedgerImpl).when(persistentTopic)).getManagedLedger();
        ((PersistentSubscription) Mockito.spy(new PersistentSubscription(persistentTopic, "test", managedLedgerImpl.openCursor("test"), false))).acknowledgeMessage(Collections.singletonList(managedLedgerImpl.addEntry("test".getBytes())), CommandAck.AckType.Individual, Collections.emptyMap());
        ((ManagedLedgerImpl) Mockito.verify(managedLedgerImpl, Mockito.times(0))).asyncReadEntry((PositionImpl) ArgumentMatchers.any(), (AsyncCallbacks.ReadEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
    }

    @Test
    public void testMarkerDelete() throws Exception {
        ByteBuf serializeMetadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, new MessageMetadata().clear().setPublishTime(1L).setProducerName("test").setSequenceId(0L), PooledByteBufAllocator.DEFAULT.buffer(0));
        ManagedLedger open = this.pulsar.getManagedLedgerFactory().open("test");
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        ((PersistentTopic) Mockito.doReturn(this.pulsar.getBrokerService()).when(persistentTopic)).getBrokerService();
        ((PersistentTopic) Mockito.doReturn(open).when(persistentTopic)).getManagedLedger();
        ((PersistentTopic) Mockito.doReturn("test").when(persistentTopic)).getName();
        ManagedCursor openCursor = open.openCursor("test");
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "test", open.openCursor("test"), false);
        byte[] bytes = toBytes(serializeMetadataAndPayload);
        PositionImpl addEntry = open.addEntry(bytes);
        Position addEntry2 = open.addEntry(toBytes(Markers.newTxnCommitMarker(1L, 1L, 1L)));
        Position addEntry3 = open.addEntry(bytes);
        Position addEntry4 = open.addEntry(toBytes(Markers.newTxnAbortMarker(1L, 1L, 1L)));
        PositionImpl addEntry5 = open.addEntry(bytes);
        Assert.assertEquals(openCursor.getNumberOfEntriesInBacklog(true), 5L);
        Assert.assertTrue(openCursor.getMarkDeletedPosition().compareTo(addEntry) < 0);
        persistentSubscription.acknowledgeMessage(Collections.singletonList(addEntry), CommandAck.AckType.Individual, Collections.emptyMap());
        Awaitility.await().during(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(persistentSubscription.getCursor().getMarkDeletedPosition().compareTo((PositionImpl) addEntry2) == 0);
        });
        persistentSubscription.acknowledgeMessage(Collections.singletonList(addEntry3), CommandAck.AckType.Individual, Collections.emptyMap());
        Awaitility.await().until(() -> {
            return Boolean.valueOf(persistentSubscription.getCursor().getMarkDeletedPosition().compareTo((PositionImpl) addEntry4) == 0);
        });
        open.addEntry(toBytes(Markers.newTxnCommitMarker(1L, 1L, 1L)));
        open.addEntry(toBytes(Markers.newTxnAbortMarker(1L, 1L, 1L)));
        Position addEntry6 = open.addEntry(toBytes(Markers.newTxnAbortMarker(1L, 1L, 1L)));
        persistentSubscription.transactionIndividualAcknowledge(new TxnID(0L, 0L), Collections.singletonList(MutablePair.of(addEntry5, 0))).get();
        persistentSubscription.endTxn(0L, 0L, 0, 0L).get();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(persistentSubscription.getCursor().getMarkDeletedPosition().compareTo((PositionImpl) addEntry6) == 0);
        });
    }

    static byte[] toBytes(ByteBuf byteBuf) {
        byte[] bArr = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bArr);
        byteBuf.release();
        return bArr;
    }
}
