package org.apache.pulsar.broker;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/LedgerLostAndSkipNonRecoverableTest.class */
public class LedgerLostAndSkipNonRecoverableTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(LedgerLostAndSkipNonRecoverableTest.class);
    private static final String DEFAULT_NAMESPACE = "my-property/my-ns";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/LedgerLostAndSkipNonRecoverableTest$ConsumerAndReceivedMessages.class */
    public static class ConsumerAndReceivedMessages {
        private Consumer consumer;
        private List<MessageIdImpl>[] messageIds;

        public ConsumerAndReceivedMessages(Consumer consumer, List<MessageIdImpl>[] listArr) {
            this.consumer = consumer;
            this.messageIds = listArr;
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void doInitConf() throws Exception {
        this.conf.setAutoSkipNonRecoverableData(true);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "batchEnabled")
    public Object[][] batchEnabled() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @Test(timeOut = 30000, dataProvider = "batchEnabled")
    public void testMarkDeletedPositionCanForwardAfterTopicLedgerLost(boolean z) throws Exception {
        String replaceAll = UUID.randomUUID().toString().replaceAll("-", "");
        String replaceAll2 = UUID.randomUUID().toString().replaceAll("-", "");
        String format = String.format("persistent://%s/%s", DEFAULT_NAMESPACE, replaceAll);
        log.info("create topic and subscription.");
        Consumer<String> createConsumer = createConsumer(format, replaceAll2, z);
        createConsumer.redeliverUnacknowledgedMessages();
        createConsumer.close();
        log.info("send many messages.");
        int i = z ? 25 : 5;
        int i2 = z ? 5 : 1;
        log.info("send {} messages", Integer.valueOf(((List) Arrays.asList(sendManyMessages(format, 3, i, i2)).stream().flatMap(list -> {
            return list.stream();
        }).collect(Collectors.toList())).size()));
        log.info("make individual ack.");
        ConsumerAndReceivedMessages waitConsumeAndAllMessages = waitConsumeAndAllMessages(format, replaceAll2, z, false);
        List<MessageIdImpl>[] listArr = waitConsumeAndAllMessages.messageIds;
        Consumer consumer = waitConsumeAndAllMessages.consumer;
        MessageIdImpl messageIdImpl = listArr[1].get(i2 - 1);
        MessageIdImpl messageIdImpl2 = new MessageIdImpl(listArr[0].get(0).getLedgerId(), listArr[0].get(0).getEntryId(), -1);
        MessageIdImpl messageIdImpl3 = new MessageIdImpl(listArr[2].get(4).getLedgerId(), listArr[2].get(4).getEntryId(), -1);
        consumer.acknowledge(messageIdImpl);
        consumer.acknowledge(messageIdImpl2);
        waitPersistentCursorLedger(format, replaceAll2, messageIdImpl2.getLedgerId(), messageIdImpl2.getEntryId());
        consumer.close();
        log.info("Make lost ledger [{}].", Long.valueOf(messageIdImpl.getLedgerId()));
        ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(format, false).get()).get()).close(false);
        this.pulsarTestContext.getMockBookKeeper().deleteLedger(messageIdImpl.getLedgerId());
        log.info("send some messages.");
        sendManyMessages(format, 3, i2);
        log.info("receive all messages then verify mark deleted position");
        ConsumerAndReceivedMessages waitConsumeAndAllMessages2 = waitConsumeAndAllMessages(format, replaceAll2, z, true);
        waitMarkDeleteLargeAndEquals(format, replaceAll2, messageIdImpl3.getLedgerId(), messageIdImpl3.getEntryId());
        waitConsumeAndAllMessages2.consumer.close();
        this.admin.topics().delete(format);
    }

    private ManagedCursorImpl getCursor(String str, String str2) throws Exception {
        return ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get()).getSubscription(str2).getCursor();
    }

    private void waitMarkDeleteLargeAndEquals(String str, String str2, long j, long j2) throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(45L)).untilAsserted(() -> {
            Position markDeletedPosition = getCursor(str, str2).getMarkDeletedPosition();
            log.info("markDeletedPosition {}:{}, expected {}:{}", new Object[]{Long.valueOf(markDeletedPosition.getLedgerId()), Long.valueOf(markDeletedPosition.getEntryId()), Long.valueOf(j), Long.valueOf(j2)});
            Assert.assertTrue(markDeletedPosition.getLedgerId() >= j);
            if (markDeletedPosition.getLedgerId() > j) {
                return;
            }
            Assert.assertTrue(markDeletedPosition.getEntryId() >= j2);
        });
    }

    private void waitPersistentCursorLedger(String str, String str2, long j, long j2) throws Exception {
        Awaitility.await().untilAsserted(() -> {
            Position persistentMarkDeletedPosition = getCursor(str, str2).getPersistentMarkDeletedPosition();
            Assert.assertEquals(persistentMarkDeletedPosition.getLedgerId(), j);
            Assert.assertEquals(persistentMarkDeletedPosition.getEntryId(), j2);
        });
    }

    private List<MessageIdImpl>[] sendManyMessages(String str, int i, int i2, int i3) throws Exception {
        List<MessageIdImpl>[] listArr = new List[i];
        for (int i4 = 0; i4 < i; i4++) {
            this.admin.topics().unload(str);
            if (i3 == 1) {
                listArr[i4] = sendManyMessages(str, i2);
            } else {
                listArr[i4] = sendManyBatchedMessages(str, i3, i2 / i3);
            }
        }
        return listArr;
    }

    private List<MessageIdImpl> sendManyMessages(String str, int i, int i2) throws Exception {
        return i2 == 1 ? sendManyMessages(str, i) : sendManyBatchedMessages(str, i2, i / i2);
    }

    private List<MessageIdImpl> sendManyMessages(String str, int i) throws Exception {
        ArrayList arrayList = new ArrayList();
        Producer create = this.pulsarClient.newProducer(Schema.JSON(String.class)).topic(str).enableBatching(false).create();
        long currentTimeMillis = System.currentTimeMillis();
        for (int i2 = 0; i2 < i; i2++) {
            String format = String.format("%s-%s", Long.valueOf(currentTimeMillis), Integer.valueOf(i2));
            arrayList.add(create.newMessage().key(String.format("Key-%s", format)).value(String.format("Msg-%s", format)).send());
        }
        create.close();
        return arrayList;
    }

    private List<MessageIdImpl> sendManyBatchedMessages(String str, int i, int i2) throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.JSON(String.class)).topic(str).enableBatching(true).batchingMaxPublishDelay(2147483647L, TimeUnit.SECONDS).batchingMaxMessages(Integer.MAX_VALUE).create();
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i2; i3++) {
            for (int i4 = 0; i4 < i; i4++) {
                arrayList.add(create.newMessage().value(String.format("entry-seq[%s], batch_index[%s]", Integer.valueOf(i3), Integer.valueOf(i4))).sendAsync());
            }
            create.flush();
        }
        create.close();
        FutureUtil.waitForAll(arrayList).get();
        return (List) arrayList.stream().map(completableFuture -> {
            return (MessageIdImpl) completableFuture.join();
        }).collect(Collectors.toList());
    }

    private ConsumerAndReceivedMessages waitConsumeAndAllMessages(String str, String str2, boolean z, boolean z2) throws Exception {
        ArrayList arrayList = new ArrayList();
        Consumer<String> createConsumer = createConsumer(str, str2, z);
        while (true) {
            Message receive = createConsumer.receive(5, TimeUnit.SECONDS);
            if (receive == null) {
                log.info("receive {} messages", Integer.valueOf(arrayList.size()));
                return new ConsumerAndReceivedMessages(createConsumer, sortMessageId(arrayList, z));
            }
            arrayList.add((MessageIdImpl) receive.getMessageId());
            if (z2) {
                createConsumer.acknowledge(receive);
            }
        }
    }

    private List<MessageIdImpl>[] sortMessageId(List<MessageIdImpl> list, boolean z) {
        TreeMap treeMap = new TreeMap((Map) list.stream().collect(Collectors.groupingBy(messageIdImpl -> {
            return Long.valueOf(messageIdImpl.getLedgerId());
        })));
        List<MessageIdImpl>[] listArr = new List[treeMap.size()];
        Iterator it = treeMap.entrySet().iterator();
        for (int i = 0; i < treeMap.size(); i++) {
            listArr[i] = (List) ((Map.Entry) it.next()).getValue();
        }
        for (List<MessageIdImpl> list2 : listArr) {
            list2.sort((messageIdImpl2, messageIdImpl3) -> {
                if (!z) {
                    return (int) ((((messageIdImpl2.getLedgerId() * 1000) + messageIdImpl2.getEntryId()) - (messageIdImpl3.getLedgerId() * 1000)) + messageIdImpl3.getEntryId());
                }
                BatchMessageIdImpl batchMessageIdImpl = (BatchMessageIdImpl) messageIdImpl2;
                BatchMessageIdImpl batchMessageIdImpl2 = (BatchMessageIdImpl) messageIdImpl3;
                return (int) (((((batchMessageIdImpl.getLedgerId() * 1000000) + (batchMessageIdImpl.getEntryId() * 1000)) + batchMessageIdImpl.getBatchIndex()) - (batchMessageIdImpl2.getLedgerId() * 1000000)) + (batchMessageIdImpl2.getEntryId() * 1000) + batchMessageIdImpl2.getBatchIndex());
            });
        }
        return listArr;
    }

    private Consumer<String> createConsumer(String str, String str2, boolean z) throws Exception {
        return this.pulsarClient.newConsumer(Schema.JSON(String.class)).autoScaledReceiverQueueSizeEnabled(false).subscriptionType(SubscriptionType.Failover).isAckReceiptEnabled(true).enableBatchIndexAcknowledgment(z).receiverQueueSize(1000).topic(new String[]{str}).subscriptionName(str2).subscribe();
    }
}
