package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-replication"})
/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.class */
public class ShadowReplicatorTest extends BrokerTestBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ShadowReplicatorTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.baseSetup();
        this.admin.tenants().createTenant("prop1", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("prop1/ns-source");
        this.admin.namespaces().createNamespace("prop1/ns-shadow");
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testShadowReplication() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://prop1/ns-source/source-topic");
        this.admin.topics().createShadowTopic("persistent://prop1/ns-shadow/shadow-topic", "persistent://prop1/ns-source/source-topic");
        this.admin.topics().createShadowTopic("persistent://prop1/ns-shadow/shadow-topic-2", "persistent://prop1/ns-source/source-topic");
        this.admin.topics().setShadowTopics("persistent://prop1/ns-source/source-topic", Lists.newArrayList(new String[]{"persistent://prop1/ns-shadow/shadow-topic", "persistent://prop1/ns-shadow/shadow-topic-2"}));
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop1/ns-source/source-topic").create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop1/ns-shadow/shadow-topic"}).subscriptionName("shadow-sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop1/ns-shadow/shadow-topic-2"}).subscriptionName("shadow-sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                try {
                    PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://prop1/ns-source/source-topic").get()).get();
                    Awaitility.await().untilAsserted(() -> {
                        Assert.assertEquals(persistentTopic.getShadowReplicators().size(), 2L);
                    });
                    ShadowReplicator shadowReplicator = (ShadowReplicator) persistentTopic.getShadowReplicators().get("persistent://prop1/ns-shadow/shadow-topic");
                    Awaitility.await().untilAsserted(() -> {
                        Assert.assertEquals(String.valueOf(shadowReplicator.getState()), "Started");
                    });
                    Consumer subscribe3 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop1/ns-source/source-topic"}).subscriptionName("source-sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                    try {
                        MessageId send = create.newMessage().sequenceId(1L).key("K").property("PK", "PV").eventTime(123L).value("test-shadow-topic".getBytes(StandardCharsets.UTF_8)).send();
                        Message receive = subscribe3.receive();
                        Assert.assertEquals(receive.getMessageId(), send);
                        Awaitility.await().until(() -> {
                            shadowReplicator.msgOut.calculateRate();
                            return Boolean.valueOf(shadowReplicator.msgOut.getCount() >= 1);
                        });
                        Awaitility.await().until(() -> {
                            return Boolean.valueOf(PersistentReplicator.PENDING_MESSAGES_UPDATER.get(shadowReplicator) == 0);
                        });
                        Assert.assertNotNull((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://prop1/ns-shadow/shadow-topic").get()).get());
                        Message receive2 = subscribe.receive(5, TimeUnit.SECONDS);
                        Assert.assertEquals(receive2.getData(), receive.getData());
                        Assert.assertEquals(receive2.getSequenceId(), receive.getSequenceId());
                        Assert.assertEquals(receive2.getEventTime(), receive.getEventTime());
                        Assert.assertEquals(receive2.getProperties(), receive.getProperties());
                        Assert.assertEquals(receive2.getKey(), receive.getKey());
                        Assert.assertEquals(receive2.getOrderingKey(), receive.getOrderingKey());
                        Assert.assertEquals(receive2.getSchemaVersion(), receive.getSchemaVersion());
                        Assert.assertEquals(receive2.getPublishTime(), receive.getPublishTime());
                        Assert.assertEquals(receive2.getBrokerPublishTime(), receive.getBrokerPublishTime());
                        Assert.assertEquals(receive2.getIndex(), receive.getIndex());
                        Assert.assertNotEquals(receive2.getReplicatedFrom(), receive.getReplicatedFrom());
                        Assert.assertEquals(receive2.getMessageId(), receive.getMessageId());
                        if (Collections.singletonList(subscribe3).get(0) != null) {
                            subscribe3.close();
                        }
                        if (Collections.singletonList(subscribe2).get(0) != null) {
                            subscribe2.close();
                        }
                        if (Collections.singletonList(subscribe).get(0) != null) {
                            subscribe.close();
                        }
                    } catch (Throwable th) {
                        if (Collections.singletonList(subscribe3).get(0) != null) {
                            subscribe3.close();
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    throw th2;
                }
            } catch (Throwable th3) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th3;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private static PersistentReplicator getAnyShadowReplicator(TopicName topicName, PulsarService pulsarService) {
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) pulsarService.getBrokerService().getTopic(topicName.toString(), false).join()).get();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(!persistentTopic.getShadowReplicators().isEmpty());
        });
        return (PersistentReplicator) persistentTopic.getShadowReplicators().values().iterator().next();
    }

    private static void waitReplicateFinish(TopicName topicName, PulsarAdmin pulsarAdmin) {
        Awaitility.await().untilAsserted(() -> {
            Iterator it = pulsarAdmin.topics().getStats(topicName.toString(), true, false, false).getReplication().entrySet().iterator();
            while (it.hasNext()) {
                Assert.assertTrue(((ReplicatorStats) ((Map.Entry) it.next()).getValue()).getReplicationBacklog() == 0, "replication task finished");
            }
        });
    }

    @Test
    public void testCounterOfPengdingMessagesCorrect() throws Exception {
        TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://prop1/ns-source/source-topic"));
        TopicName topicName2 = TopicName.get(BrokerTestUtil.newUniqueName("persistent://prop1/ns-shadow/shadow-topic"));
        this.admin.topics().createNonPartitionedTopic(topicName.toString());
        this.admin.topics().createShadowTopic(topicName2.toString(), topicName.toString());
        this.admin.topics().setShadowTopics(topicName.toString(), Lists.newArrayList(new String[]{topicName2.toString()}));
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{topicName.toString()}).subscriptionName("my-sub").receiverQueueSize(10).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)).topic(topicName.toString()).enableBatching(false).create();
            for (int i = 0; i < 20; i++) {
                try {
                    create.send(new Schemas.PersonOne(i));
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            PersistentReplicator anyShadowReplicator = getAnyShadowReplicator(topicName, this.pulsar);
            waitReplicateFinish(topicName, this.admin);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(((Integer) WhiteboxImpl.getInternalState(anyShadowReplicator, "pendingMessages")).intValue(), 0);
            });
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }
}
