/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.testng.PowerMockObjectFactory;
import org.testng.Assert;
import org.testng.IObjectFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

@PrepareForTest(value={DispatchRateLimiter.class})
@PowerMockIgnore(value={"org.apache.logging.log4j.*"})
@Test(groups={"broker"})
public class PersistentStickyKeyDispatcherMultipleConsumersTest {
    private PulsarService pulsarMock;
    private BrokerService brokerMock;
    private ManagedCursorImpl cursorMock;
    private Consumer consumerMock;
    private PersistentTopic topicMock;
    private PersistentSubscription subscriptionMock;
    private ServiceConfiguration configMock;
    private ChannelPromise channelMock;
    private PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher;
    final String topicName = "persistent://public/default/testTopic";
    final String subscriptionName = "testSubscription";

    @ObjectFactory
    public IObjectFactory getObjectFactory() {
        return new PowerMockObjectFactory();
    }

    @BeforeMethod
    public void setup() throws Exception {
        this.configMock = (ServiceConfiguration)Mockito.mock(ServiceConfiguration.class);
        ((ServiceConfiguration)Mockito.doReturn((Object)true).when((Object)this.configMock)).isSubscriptionRedeliveryTrackerEnabled();
        ((ServiceConfiguration)Mockito.doReturn((Object)100).when((Object)this.configMock)).getDispatcherMaxReadBatchSize();
        ((ServiceConfiguration)Mockito.doReturn((Object)true).when((Object)this.configMock)).isSubscriptionKeySharedUseConsistentHashing();
        ((ServiceConfiguration)Mockito.doReturn((Object)1).when((Object)this.configMock)).getSubscriptionKeySharedConsistentHashingReplicaPoints();
        this.pulsarMock = (PulsarService)Mockito.mock(PulsarService.class);
        ((PulsarService)Mockito.doReturn((Object)this.configMock).when((Object)this.pulsarMock)).getConfiguration();
        this.brokerMock = (BrokerService)Mockito.mock(BrokerService.class);
        ((BrokerService)Mockito.doReturn((Object)this.pulsarMock).when((Object)this.brokerMock)).pulsar();
        this.topicMock = (PersistentTopic)Mockito.mock(PersistentTopic.class);
        ((PersistentTopic)Mockito.doReturn((Object)this.brokerMock).when((Object)this.topicMock)).getBrokerService();
        ((PersistentTopic)Mockito.doReturn((Object)"persistent://public/default/testTopic").when((Object)this.topicMock)).getName();
        this.cursorMock = (ManagedCursorImpl)Mockito.mock(ManagedCursorImpl.class);
        ((ManagedCursorImpl)Mockito.doReturn(null).when((Object)this.cursorMock)).getLastIndividualDeletedRange();
        ((ManagedCursorImpl)Mockito.doReturn((Object)"testSubscription").when((Object)this.cursorMock)).getName();
        this.consumerMock = (Consumer)Mockito.mock(Consumer.class);
        this.channelMock = (ChannelPromise)Mockito.mock(ChannelPromise.class);
        ((Consumer)Mockito.doReturn((Object)"consumer1").when((Object)this.consumerMock)).consumerName();
        ((Consumer)Mockito.doReturn((Object)1000).when((Object)this.consumerMock)).getAvailablePermits();
        ((Consumer)Mockito.doReturn((Object)true).when((Object)this.consumerMock)).isWritable();
        ((Consumer)Mockito.doReturn((Object)this.channelMock).when((Object)this.consumerMock)).sendMessages(Mockito.anyList(), (EntryBatchSizes)ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks)ArgumentMatchers.any(EntryBatchIndexesAcks.class), Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong(), (RedeliveryTracker)ArgumentMatchers.any(RedeliveryTracker.class));
        this.subscriptionMock = (PersistentSubscription)Mockito.mock(PersistentSubscription.class);
        PowerMockito.mockStatic(DispatchRateLimiter.class, (Class[])new Class[0]);
        PowerMockito.when((Object)DispatchRateLimiter.isDispatchRateNeeded((BrokerService)((BrokerService)ArgumentMatchers.any(BrokerService.class)), (Optional)((Optional)ArgumentMatchers.any(Optional.class)), (String)Mockito.anyString(), (DispatchRateLimiter.Type)((DispatchRateLimiter.Type)ArgumentMatchers.any(DispatchRateLimiter.Type.class)))).thenReturn((Object)false);
        this.persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(this.topicMock, (ManagedCursor)this.cursorMock, (Subscription)this.subscriptionMock, this.configMock, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
    }

    @Test
    public void testSendMarkerMessage() {
        try {
            this.persistentDispatcher.addConsumer(this.consumerMock);
            this.persistentDispatcher.consumerFlow(this.consumerMock, 1000);
        }
        catch (Exception e) {
            Assert.fail((String)"Failed to add mock consumer", (Throwable)e);
        }
        ArrayList<EntryImpl> entries = new ArrayList<EntryImpl>();
        ByteBuf markerMessage = Markers.newReplicatedSubscriptionsSnapshotRequest((String)"testSnapshotId", (String)"testSourceCluster");
        entries.add(EntryImpl.create((long)1L, (long)1L, (ByteBuf)markerMessage));
        entries.add(EntryImpl.create((long)1L, (long)2L, (ByteBuf)this.createMessage("message1", 1)));
        entries.add(EntryImpl.create((long)1L, (long)3L, (ByteBuf)this.createMessage("message2", 2)));
        entries.add(EntryImpl.create((long)1L, (long)4L, (ByteBuf)this.createMessage("message3", 3)));
        entries.add(EntryImpl.create((long)1L, (long)5L, (ByteBuf)this.createMessage("message4", 4)));
        entries.add(EntryImpl.create((long)1L, (long)6L, (ByteBuf)this.createMessage("message5", 5)));
        try {
            this.persistentDispatcher.readEntriesComplete(entries, (Object)PersistentDispatcherMultipleConsumers.ReadType.Normal);
        }
        catch (Exception e) {
            Assert.fail((String)"Failed to readEntriesComplete.", (Throwable)e);
        }
        ArgumentCaptor totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class);
        ((Consumer)Mockito.verify((Object)this.consumerMock, (VerificationMode)Mockito.times((int)1))).sendMessages(Mockito.anyList(), (EntryBatchSizes)ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks)ArgumentMatchers.any(EntryBatchIndexesAcks.class), ((Integer)totalMessagesCaptor.capture()).intValue(), Mockito.anyLong(), Mockito.anyLong(), (RedeliveryTracker)ArgumentMatchers.any(RedeliveryTracker.class));
        List allTotalMessagesCaptor = totalMessagesCaptor.getAllValues();
        Assert.assertEquals((int)((Integer)allTotalMessagesCaptor.get(0)), (int)5);
    }

    @Test(timeOut=10000L)
    public void testSendMessage() {
        KeySharedMeta keySharedMeta = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY);
        PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(this.topicMock, (ManagedCursor)this.cursorMock, (Subscription)this.subscriptionMock, this.configMock, keySharedMeta);
        try {
            keySharedMeta.addHashRange().setStart(0).setEnd(9);
            Consumer consumerMock = (Consumer)Mockito.mock(Consumer.class);
            ((Consumer)Mockito.doReturn((Object)keySharedMeta).when((Object)consumerMock)).getKeySharedMeta();
            persistentDispatcher.addConsumer(consumerMock);
            persistentDispatcher.consumerFlow(consumerMock, 1000);
        }
        catch (Exception e) {
            Assert.fail((String)"Failed to add mock consumer", (Throwable)e);
        }
        ArrayList<EntryImpl> entries = new ArrayList<EntryImpl>();
        entries.add(EntryImpl.create((long)1L, (long)1L, (ByteBuf)this.createMessage("message1", 1)));
        entries.add(EntryImpl.create((long)1L, (long)2L, (ByteBuf)this.createMessage("message2", 2)));
        try {
            persistentDispatcher.readEntriesComplete(entries, (Object)PersistentDispatcherMultipleConsumers.ReadType.Normal);
        }
        catch (Exception e) {
            Assert.fail((String)"Failed to readEntriesComplete.", (Throwable)e);
        }
    }

    @Test
    public void testSkipRedeliverTemporally() {
        Consumer slowConsumerMock = (Consumer)Mockito.mock(Consumer.class);
        ChannelPromise slowChannelMock = (ChannelPromise)Mockito.mock(ChannelPromise.class);
        ArrayList<EntryImpl> redeliverEntries = new ArrayList<EntryImpl>();
        redeliverEntries.add(EntryImpl.create((long)1L, (long)1L, (ByteBuf)this.createMessage("message1", 1, "key1")));
        ArrayList<EntryImpl> readEntries = new ArrayList<EntryImpl>();
        readEntries.add(EntryImpl.create((long)1L, (long)2L, (ByteBuf)this.createMessage("message2", 2, "key1")));
        readEntries.add(EntryImpl.create((long)1L, (long)3L, (ByteBuf)this.createMessage("message3", 3, "key2")));
        try {
            Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits");
            totalAvailablePermitsField.setAccessible(true);
            totalAvailablePermitsField.set(this.persistentDispatcher, 1000);
            ((ManagedCursorImpl)Mockito.doAnswer(invocationOnMock -> {
                ((PersistentStickyKeyDispatcherMultipleConsumers)invocationOnMock.getArgument(2)).readEntriesComplete(readEntries, (Object)PersistentDispatcherMultipleConsumers.ReadType.Normal);
                return null;
            }).when((Object)this.cursorMock)).asyncReadEntriesOrWait(Mockito.anyInt(), Mockito.anyLong(), (AsyncCallbacks.ReadEntriesCallback)ArgumentMatchers.any(PersistentStickyKeyDispatcherMultipleConsumers.class), Mockito.eq((Object)PersistentDispatcherMultipleConsumers.ReadType.Normal), (PositionImpl)ArgumentMatchers.any());
        }
        catch (Exception e) {
            Assert.fail((String)"Failed to set to field", (Throwable)e);
        }
        try {
            ((Consumer)Mockito.doReturn((Object)"consumer2").when((Object)slowConsumerMock)).consumerName();
            Mockito.when((Object)slowConsumerMock.getAvailablePermits()).thenReturn((Object)0).thenReturn((Object)1);
            ((Consumer)Mockito.doReturn((Object)true).when((Object)slowConsumerMock)).isWritable();
            ((Consumer)Mockito.doReturn((Object)slowChannelMock).when((Object)slowConsumerMock)).sendMessages(Mockito.anyList(), (EntryBatchSizes)ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks)ArgumentMatchers.any(EntryBatchIndexesAcks.class), Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong(), (RedeliveryTracker)ArgumentMatchers.any(RedeliveryTracker.class));
            this.persistentDispatcher.addConsumer(this.consumerMock);
            this.persistentDispatcher.addConsumer(slowConsumerMock);
        }
        catch (Exception e) {
            Assert.fail((String)"Failed to add mock consumer", (Throwable)e);
        }
        this.persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, redeliverEntries);
        ((Consumer)Mockito.verify((Object)this.consumerMock, (VerificationMode)Mockito.times((int)1))).sendMessages((List)Mockito.argThat(arg -> {
            Assert.assertEquals((int)arg.size(), (int)1);
            Entry entry = (Entry)arg.get(0);
            Assert.assertEquals((long)entry.getLedgerId(), (long)1L);
            Assert.assertEquals((long)entry.getEntryId(), (long)3L);
            return true;
        }), (EntryBatchSizes)ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks)ArgumentMatchers.any(EntryBatchIndexesAcks.class), Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong(), (RedeliveryTracker)ArgumentMatchers.any(RedeliveryTracker.class));
        ((Consumer)Mockito.verify((Object)slowConsumerMock, (VerificationMode)Mockito.times((int)0))).sendMessages(Mockito.anyList(), (EntryBatchSizes)ArgumentMatchers.any(EntryBatchSizes.class), (EntryBatchIndexesAcks)ArgumentMatchers.any(EntryBatchIndexesAcks.class), Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong(), (RedeliveryTracker)ArgumentMatchers.any(RedeliveryTracker.class));
    }

    private ByteBuf createMessage(String message, int sequenceId) {
        return this.createMessage(message, sequenceId, "testKey");
    }

    private ByteBuf createMessage(String message, int sequenceId, String key) {
        MessageMetadata messageMetadata = new MessageMetadata().setSequenceId((long)sequenceId).setProducerName("testProducer").setPartitionKey(key).setPartitionKeyB64Encoded(false).setPublishTime(System.currentTimeMillis());
        return Commands.serializeMetadataAndPayload((Commands.ChecksumType)Commands.ChecksumType.Crc32c, (MessageMetadata)messageMetadata, (ByteBuf)Unpooled.copiedBuffer((byte[])message.getBytes(StandardCharsets.UTF_8)));
    }
}

