package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Lists;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/ShadowTopicTest.class */
public class ShadowTopicTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(ShadowTopicTest.class);

    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/ShadowTopicTest$Point.class */
    private static class Point {
        int x;
        int y;

        public Point(int i, int i2) {
            this.x = i;
            this.y = i2;
        }

        public Point() {
        }

        public int getX() {
            return this.x;
        }

        public int getY() {
            return this.y;
        }

        public void setX(int i) {
            this.x = i;
        }

        public void setY(int i) {
            this.y = i;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Point)) {
                return false;
            }
            Point point = (Point) obj;
            return point.canEqual(this) && getX() == point.getX() && getY() == point.getY();
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof Point;
        }

        public int hashCode() {
            return (((1 * 59) + getX()) * 59) + getY();
        }

        public String toString() {
            return "ShadowTopicTest.Point(x=" + getX() + ", y=" + getY() + ")";
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    private String newShadowSourceTopicName() {
        return "persistent://" + newTopicName();
    }

    @Test
    public void testNonPartitionedShadowTopicSetup() throws Exception {
        String newShadowSourceTopicName = newShadowSourceTopicName();
        String str = newShadowSourceTopicName + "-shadow";
        this.admin.topics().createNonPartitionedTopic(newShadowSourceTopicName);
        this.admin.topics().createShadowTopic(str, newShadowSourceTopicName);
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get();
        Assert.assertTrue(persistentTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl);
        Assert.assertEquals(((TopicName) persistentTopic.getShadowSourceTopic().get()).toString(), newShadowSourceTopicName);
        Assert.assertEquals(this.admin.topics().getShadowSource(str), newShadowSourceTopicName);
        this.admin.namespaces().unload("prop/ns-abc");
        Assert.assertTrue(this.pulsar.getBrokerService().getTopicReference(str).isEmpty());
        Assert.assertEquals(this.admin.topics().getShadowSource(str), newShadowSourceTopicName);
        PersistentTopic persistentTopic2 = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get();
        Assert.assertTrue(persistentTopic2.getManagedLedger() instanceof ShadowManagedLedgerImpl);
        Assert.assertEquals(((TopicName) persistentTopic2.getShadowSourceTopic().get()).toString(), newShadowSourceTopicName);
    }

    @Test
    public void testPartitionedShadowTopicSetup() throws Exception {
        String newShadowSourceTopicName = newShadowSourceTopicName();
        String str = newShadowSourceTopicName + "-shadow";
        String topicName = TopicName.get(newShadowSourceTopicName).getPartition(0).toString();
        String topicName2 = TopicName.get(str).getPartition(0).toString();
        this.admin.topics().createPartitionedTopic(newShadowSourceTopicName, 2);
        this.admin.topics().createShadowTopic(str, newShadowSourceTopicName);
        this.pulsarClient.newProducer().topic(str).create().close();
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(topicName2).get()).get();
        Assert.assertTrue(persistentTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl);
        Assert.assertEquals(((TopicName) persistentTopic.getShadowSourceTopic().get()).toString(), topicName);
        Assert.assertEquals(this.admin.topics().getShadowSource(str), newShadowSourceTopicName);
        this.admin.namespaces().unload("prop/ns-abc");
        Assert.assertTrue(this.pulsar.getBrokerService().getTopicReference(str).isEmpty());
        Assert.assertEquals(this.admin.topics().getShadowSource(str), newShadowSourceTopicName);
        PersistentTopic persistentTopic2 = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(topicName2).get()).get();
        Assert.assertTrue(persistentTopic2.getManagedLedger() instanceof ShadowManagedLedgerImpl);
        Assert.assertEquals(((TopicName) persistentTopic2.getShadowSourceTopic().get()).toString(), topicName);
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testPartitionedShadowTopicProduceAndConsume() throws Exception {
        String newShadowSourceTopicName = newShadowSourceTopicName();
        String str = newShadowSourceTopicName + "-shadow";
        this.admin.topics().createPartitionedTopic(newShadowSourceTopicName, 3);
        this.admin.topics().createShadowTopic(str, newShadowSourceTopicName);
        this.admin.topics().setShadowTopics(newShadowSourceTopicName, Lists.newArrayList(new String[]{str}));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newShadowSourceTopicName).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("test").subscribe();
            for (int i = 0; i < 10; i++) {
                try {
                    create.send("msg-" + i);
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            HashSet hashSet = new HashSet();
            for (int i2 = 0; i2 < 10; i2++) {
                hashSet.add((String) subscribe.receive().getValue());
            }
            for (int i3 = 0; i3 < 10; i3++) {
                Assert.assertTrue(hashSet.contains("msg-" + i3));
            }
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testShadowTopicNotWritable() throws Exception {
        String newShadowSourceTopicName = newShadowSourceTopicName();
        String str = newShadowSourceTopicName + "-shadow";
        this.admin.topics().createNonPartitionedTopic(newShadowSourceTopicName);
        this.admin.topics().createShadowTopic(str, newShadowSourceTopicName);
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        try {
            Assert.expectThrows(PulsarClientException.NotAllowedException.class, () -> {
                create.send(new byte[]{1, 2, 3});
            });
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    private void awaitUntilShadowReplicatorReady(String str, String str2) {
        Awaitility.await().untilAsserted(() -> {
            ShadowReplicator shadowReplicator = (ShadowReplicator) ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getShadowReplicators().get(str2);
            Assert.assertNotNull(shadowReplicator);
            Assert.assertEquals(String.valueOf(shadowReplicator.getState()), "Started");
        });
    }

    @Test
    public void testShadowTopicConsuming() throws Exception {
        String newShadowSourceTopicName = newShadowSourceTopicName();
        String str = newShadowSourceTopicName + "-shadow";
        this.admin.topics().createNonPartitionedTopic(newShadowSourceTopicName);
        this.admin.topics().createShadowTopic(str, newShadowSourceTopicName);
        this.admin.topics().setShadowTopics(newShadowSourceTopicName, Lists.newArrayList(new String[]{str}));
        awaitUntilShadowReplicatorReady(newShadowSourceTopicName, str);
        Producer create = this.pulsarClient.newProducer().topic(newShadowSourceTopicName).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("sub").subscribe();
            try {
                byte[] bytes = "Hello Shadow Topic".getBytes(StandardCharsets.UTF_8);
                MessageId send = create.send(bytes);
                log.info("msg send to source topic, id={}", send);
                Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                Assert.assertEquals(receive.getMessageId(), send);
                Assert.assertEquals((byte[]) receive.getValue(), bytes);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testShadowTopicConsumingWithStringSchema() throws Exception {
        String newShadowSourceTopicName = newShadowSourceTopicName();
        String str = newShadowSourceTopicName + "-shadow";
        this.admin.topics().createNonPartitionedTopic(newShadowSourceTopicName);
        this.admin.topics().createShadowTopic(str, newShadowSourceTopicName);
        this.admin.topics().setShadowTopics(newShadowSourceTopicName, Lists.newArrayList(new String[]{str}));
        awaitUntilShadowReplicatorReady(newShadowSourceTopicName, str);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newShadowSourceTopicName).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                MessageId send = create.send("Hello Shadow Topic");
                Message receive = subscribe.receive();
                Assert.assertEquals(receive.getMessageId(), send);
                Assert.assertEquals((String) receive.getValue(), "Hello Shadow Topic");
                for (int i = 0; i < 10; i++) {
                    create.send("Hello Shadow Topic" + i);
                }
                for (int i2 = 0; i2 < 10; i2++) {
                    Assert.assertEquals((String) subscribe.receive().getValue(), "Hello Shadow Topic" + i2);
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testShadowTopicConsumingWithJsonSchema() throws Exception {
        String newShadowSourceTopicName = newShadowSourceTopicName();
        String str = newShadowSourceTopicName + "-shadow";
        this.admin.topics().createNonPartitionedTopic(newShadowSourceTopicName);
        this.admin.topics().createShadowTopic(str, newShadowSourceTopicName);
        this.admin.topics().setShadowTopics(newShadowSourceTopicName, Lists.newArrayList(new String[]{str}));
        awaitUntilShadowReplicatorReady(newShadowSourceTopicName, str);
        Producer create = this.pulsarClient.newProducer(Schema.JSON(Point.class)).topic(newShadowSourceTopicName).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.JSON(Point.class)).topic(new String[]{str}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                Point point = new Point(1, 2);
                MessageId send = create.send(point);
                Message receive = subscribe.receive();
                Assert.assertEquals(receive.getMessageId(), send);
                Assert.assertEquals(receive.getValue(), point);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testConsumeShadowMessageWithoutCache() throws Exception {
        String newShadowSourceTopicName = newShadowSourceTopicName();
        String str = newShadowSourceTopicName + "-shadow";
        this.admin.topics().createNonPartitionedTopic(newShadowSourceTopicName);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newShadowSourceTopicName).create();
        try {
            MessageId send = create.send("Hello Shadow Topic");
            for (int i = 0; i < 10; i++) {
                create.send("Hello Shadow Topic" + i);
            }
            this.admin.topics().createShadowTopic(str, newShadowSourceTopicName);
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                Message receive = subscribe.receive();
                Assert.assertEquals(receive.getMessageId(), send);
                Assert.assertEquals((String) receive.getValue(), "Hello Shadow Topic");
                for (int i2 = 0; i2 < 10; i2++) {
                    Assert.assertEquals((String) subscribe.receive().getValue(), "Hello Shadow Topic" + i2);
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }
}
