package org.apache.pulsar.broker.delayed;

import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.class */
public class DelayedDeliveryTrackerFactoryTest extends ProducerConsumerBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        this.conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName());
        this.conf.setDelayedDeliveryMaxNumBuckets(10);
        this.conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1);
        this.conf.setDelayedDeliveryMaxIndexesPerBucketSnapshotSegment(10);
        this.conf.setDelayedDeliveryMinIndexCountPerBucket(50L);
        this.conf.setDelayedDeliveryTickTimeMillis(1024L);
        this.conf.setDispatcherReadFailureBackoffInitialTimeInMs(1000);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testFallbackToInMemoryTracker() throws Exception {
        Pair<BrokerService, AbstractPersistentDispatcherMultipleConsumers> mockDelayedDeliveryTrackerFactoryAndDispatcher = mockDelayedDeliveryTrackerFactoryAndDispatcher();
        BrokerService brokerService = (BrokerService) mockDelayedDeliveryTrackerFactoryAndDispatcher.getLeft();
        DelayedDeliveryTracker newTracker = brokerService.getDelayedDeliveryTrackerFactory().newTracker((AbstractPersistentDispatcherMultipleConsumers) mockDelayedDeliveryTrackerFactoryAndDispatcher.getRight());
        try {
            Assert.assertTrue(newTracker instanceof InMemoryDelayedDeliveryTracker);
            Assert.assertTrue(brokerService.getFallbackDelayedDeliveryTrackerFactory() instanceof InMemoryDelayedDeliveryTrackerFactory);
            if (Collections.singletonList(newTracker).get(0) != null) {
                newTracker.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newTracker).get(0) != null) {
                newTracker.close();
            }
            throw th;
        }
    }

    private Pair<BrokerService, AbstractPersistentDispatcherMultipleConsumers> mockDelayedDeliveryTrackerFactoryAndDispatcher() throws Exception {
        BrokerService brokerService = (BrokerService) Mockito.spy(this.pulsar.getBrokerService());
        AbstractPersistentDispatcherMultipleConsumers abstractPersistentDispatcherMultipleConsumers = (AbstractPersistentDispatcherMultipleConsumers) Mockito.mock(AbstractPersistentDispatcherMultipleConsumers.class);
        ((AbstractPersistentDispatcherMultipleConsumers) Mockito.doReturn("test").when(abstractPersistentDispatcherMultipleConsumers)).getName();
        BucketDelayedDeliveryTrackerFactory bucketDelayedDeliveryTrackerFactory = new BucketDelayedDeliveryTrackerFactory();
        try {
            bucketDelayedDeliveryTrackerFactory = (BucketDelayedDeliveryTrackerFactory) Mockito.spy(bucketDelayedDeliveryTrackerFactory);
            bucketDelayedDeliveryTrackerFactory.initialize(this.pulsar);
            ((BucketDelayedDeliveryTrackerFactory) Mockito.doThrow(new Throwable[]{new RecoverDelayedDeliveryTrackerException(new RuntimeException())}).when(bucketDelayedDeliveryTrackerFactory)).newTracker0((AbstractPersistentDispatcherMultipleConsumers) Mockito.eq(abstractPersistentDispatcherMultipleConsumers));
            ((BrokerService) Mockito.doReturn(bucketDelayedDeliveryTrackerFactory).when(brokerService)).getDelayedDeliveryTrackerFactory();
            PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
            ((PersistentTopic) Mockito.doReturn(brokerService).when(persistentTopic)).getBrokerService();
            Subscription subscription = (Subscription) Mockito.mock(Subscription.class);
            ((PersistentTopic) Mockito.doReturn("topic").when(persistentTopic)).getName();
            ((Subscription) Mockito.doReturn("sub").when(subscription)).getName();
            ((AbstractPersistentDispatcherMultipleConsumers) Mockito.doReturn(persistentTopic).when(abstractPersistentDispatcherMultipleConsumers)).getTopic();
            ((AbstractPersistentDispatcherMultipleConsumers) Mockito.doReturn(subscription).when(abstractPersistentDispatcherMultipleConsumers)).getSubscription();
            Pair<BrokerService, AbstractPersistentDispatcherMultipleConsumers> of = Pair.of(brokerService, abstractPersistentDispatcherMultipleConsumers);
            if (Collections.singletonList(bucketDelayedDeliveryTrackerFactory).get(0) != null) {
                bucketDelayedDeliveryTrackerFactory.close();
            }
            return of;
        } catch (Throwable th) {
            if (Collections.singletonList(bucketDelayedDeliveryTrackerFactory).get(0) != null) {
                bucketDelayedDeliveryTrackerFactory.close();
            }
            throw th;
        }
    }

    @Test
    public void testFallbackToInMemoryTrackerFactoryFailed() throws Exception {
        Pair<BrokerService, AbstractPersistentDispatcherMultipleConsumers> mockDelayedDeliveryTrackerFactoryAndDispatcher = mockDelayedDeliveryTrackerFactoryAndDispatcher();
        BrokerService brokerService = (BrokerService) mockDelayedDeliveryTrackerFactoryAndDispatcher.getLeft();
        AbstractPersistentDispatcherMultipleConsumers abstractPersistentDispatcherMultipleConsumers = (AbstractPersistentDispatcherMultipleConsumers) mockDelayedDeliveryTrackerFactoryAndDispatcher.getRight();
        InMemoryDelayedDeliveryTrackerFactory inMemoryDelayedDeliveryTrackerFactory = new InMemoryDelayedDeliveryTrackerFactory();
        try {
            inMemoryDelayedDeliveryTrackerFactory = (InMemoryDelayedDeliveryTrackerFactory) Mockito.spy(inMemoryDelayedDeliveryTrackerFactory);
            inMemoryDelayedDeliveryTrackerFactory.initialize(this.pulsar);
            ((InMemoryDelayedDeliveryTrackerFactory) Mockito.doThrow(new Throwable[]{new RuntimeException()}).when(inMemoryDelayedDeliveryTrackerFactory)).newTracker0((AbstractPersistentDispatcherMultipleConsumers) Mockito.eq(abstractPersistentDispatcherMultipleConsumers));
            ((BrokerService) Mockito.doAnswer(invocationOnMock -> {
                return null;
            }).when(brokerService)).initializeFallbackDelayedDeliveryTrackerFactory();
            ((BrokerService) Mockito.doReturn(inMemoryDelayedDeliveryTrackerFactory).when(brokerService)).getFallbackDelayedDeliveryTrackerFactory();
            DelayedDeliveryTracker newTracker = brokerService.getDelayedDeliveryTrackerFactory().newTracker(abstractPersistentDispatcherMultipleConsumers);
            try {
                Assert.assertEquals(newTracker, DelayedDeliveryTracker.DISABLE);
                if (Collections.singletonList(newTracker).get(0) != null) {
                    newTracker.close();
                }
                if (Collections.singletonList(inMemoryDelayedDeliveryTrackerFactory).get(0) != null) {
                    inMemoryDelayedDeliveryTrackerFactory.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(newTracker).get(0) != null) {
                    newTracker.close();
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(inMemoryDelayedDeliveryTrackerFactory).get(0) != null) {
                inMemoryDelayedDeliveryTrackerFactory.close();
            }
            throw th2;
        }
    }

    @Test(timeOut = 60000)
    public void testPublishDelayMessagesAndCreateBucketDelayDeliveryTrackerFailed() throws Exception {
        String str = "persistent://public/default/" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).create();
        try {
            PersistentTopic persistentTopic = (PersistentTopic) Mockito.spy((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get());
            BrokerService brokerService = (BrokerService) Mockito.spy(this.pulsar.getBrokerService());
            BucketDelayedDeliveryTrackerFactory bucketDelayedDeliveryTrackerFactory = (BucketDelayedDeliveryTrackerFactory) Mockito.spy(brokerService.getDelayedDeliveryTrackerFactory());
            ((BucketDelayedDeliveryTrackerFactory) Mockito.doThrow(new Throwable[]{new RecoverDelayedDeliveryTrackerException(new RuntimeException())}).when(bucketDelayedDeliveryTrackerFactory)).newTracker0((AbstractPersistentDispatcherMultipleConsumers) Mockito.any());
            ((BrokerService) Mockito.doReturn(bucketDelayedDeliveryTrackerFactory).when(brokerService)).getDelayedDeliveryTrackerFactory();
            ((PersistentTopic) Mockito.doReturn(brokerService).when(persistentTopic)).getBrokerService();
            brokerService.getTopics().put(str, CompletableFuture.completedFuture(Optional.of(persistentTopic)));
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub").subscriptionType(SubscriptionType.Shared).messageListener((consumer, message) -> {
                try {
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    throw new RuntimeException((Throwable) e);
                }
            }).subscribe();
            PersistentDispatcherMultipleConsumers dispatcher = persistentTopic.getSubscription("sub").getDispatcher();
            Assert.assertTrue(dispatcher instanceof PersistentDispatcherMultipleConsumers);
            create.newMessage().value("test").deliverAfter(10000L, TimeUnit.MILLISECONDS).send();
            PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = dispatcher;
            Field declaredField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("delayedDeliveryTracker");
            declaredField.setAccessible(true);
            AtomicReference atomicReference = new AtomicReference();
            Awaitility.await().atMost(Duration.ofSeconds(20L)).until(() -> {
                Optional optional = (Optional) declaredField.get(persistentDispatcherMultipleConsumers);
                if (!optional.isPresent()) {
                    return false;
                }
                atomicReference.set(optional);
                return true;
            });
            Optional optional = (Optional) atomicReference.get();
            Assert.assertTrue(optional.get() instanceof InMemoryDelayedDeliveryTracker);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            InMemoryDelayedDeliveryTracker inMemoryDelayedDeliveryTracker = (InMemoryDelayedDeliveryTracker) Mockito.spy((InMemoryDelayedDeliveryTracker) optional.get());
            ((InMemoryDelayedDeliveryTracker) Mockito.doAnswer(invocationOnMock -> {
                atomicInteger.incrementAndGet();
                return invocationOnMock.callRealMethod();
            }).when(inMemoryDelayedDeliveryTracker)).addMessage(Mockito.anyLong(), Mockito.anyLong(), Mockito.anyLong());
            declaredField.set(persistentDispatcherMultipleConsumers, Optional.of(inMemoryDelayedDeliveryTracker));
            for (int i = 0; i < 10; i++) {
                create.newMessage().value("test").deliverAfter(10000L, TimeUnit.MILLISECONDS).send();
            }
            try {
                Awaitility.await().atMost(Duration.ofSeconds(20L)).until(() -> {
                    return Boolean.valueOf(atomicInteger.get() == 10);
                });
                subscribe.close();
            } catch (Throwable th) {
                subscribe.close();
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1967299060:
                if (implMethodName.equals("lambda$testPublishDelayMessagesAndCreateBucketDelayDeliveryTrackerFailed$ca54154b$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    return (consumer, message) -> {
                        try {
                            consumer.acknowledge(message);
                        } catch (PulsarClientException e) {
                            throw new RuntimeException((Throwable) e);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
