package org.apache.pulsar.broker.admin;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.TopicExistsInfo;
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerAcks;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.data.ProducerMessages;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/admin/TopicsTest.class */
public class TopicsTest extends MockedPulsarServiceBaseTest {
    private Topics topics;
    private final String testLocalCluster = "test";
    private final String testTenant = "my-tenant";
    private final String testNamespace = "my-namespace";
    private final String testTopicName = "my-topic";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/admin/TopicsTest$GPU.class */
    public enum GPU {
        AMD,
        NVIDIA
    }

    /* loaded from: input_file:org/apache/pulsar/broker/admin/TopicsTest$PC.class */
    private static class PC {
        public String brand;
        public String model;
        public int year;
        public GPU gpu;
        public Seller seller;

        public String getBrand() {
            return this.brand;
        }

        public String getModel() {
            return this.model;
        }

        public int getYear() {
            return this.year;
        }

        public GPU getGpu() {
            return this.gpu;
        }

        public Seller getSeller() {
            return this.seller;
        }

        public void setBrand(String str) {
            this.brand = str;
        }

        public void setModel(String str) {
            this.model = str;
        }

        public void setYear(int i) {
            this.year = i;
        }

        public void setGpu(GPU gpu) {
            this.gpu = gpu;
        }

        public void setSeller(Seller seller) {
            this.seller = seller;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof PC)) {
                return false;
            }
            PC pc = (PC) obj;
            if (!pc.canEqual(this) || getYear() != pc.getYear()) {
                return false;
            }
            String brand = getBrand();
            String brand2 = pc.getBrand();
            if (brand == null) {
                if (brand2 != null) {
                    return false;
                }
            } else if (!brand.equals(brand2)) {
                return false;
            }
            String model = getModel();
            String model2 = pc.getModel();
            if (model == null) {
                if (model2 != null) {
                    return false;
                }
            } else if (!model.equals(model2)) {
                return false;
            }
            GPU gpu = getGpu();
            GPU gpu2 = pc.getGpu();
            if (gpu == null) {
                if (gpu2 != null) {
                    return false;
                }
            } else if (!gpu.equals(gpu2)) {
                return false;
            }
            Seller seller = getSeller();
            Seller seller2 = pc.getSeller();
            return seller == null ? seller2 == null : seller.equals(seller2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof PC;
        }

        public int hashCode() {
            int year = (1 * 59) + getYear();
            String brand = getBrand();
            int hashCode = (year * 59) + (brand == null ? 43 : brand.hashCode());
            String model = getModel();
            int hashCode2 = (hashCode * 59) + (model == null ? 43 : model.hashCode());
            GPU gpu = getGpu();
            int hashCode3 = (hashCode2 * 59) + (gpu == null ? 43 : gpu.hashCode());
            Seller seller = getSeller();
            return (hashCode3 * 59) + (seller == null ? 43 : seller.hashCode());
        }

        public String toString() {
            return "TopicsTest.PC(brand=" + getBrand() + ", model=" + getModel() + ", year=" + getYear() + ", gpu=" + getGpu() + ", seller=" + getSeller() + ")";
        }

        public PC(String str, String str2, int i, GPU gpu, Seller seller) {
            this.brand = str;
            this.model = str2;
            this.year = i;
            this.gpu = gpu;
            this.seller = seller;
        }

        public PC() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/admin/TopicsTest$Seller.class */
    public static class Seller {
        public String state;
        public String street;
        public long zipCode;

        public String getState() {
            return this.state;
        }

        public String getStreet() {
            return this.street;
        }

        public long getZipCode() {
            return this.zipCode;
        }

        public void setState(String str) {
            this.state = str;
        }

        public void setStreet(String str) {
            this.street = str;
        }

        public void setZipCode(long j) {
            this.zipCode = j;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Seller)) {
                return false;
            }
            Seller seller = (Seller) obj;
            if (!seller.canEqual(this) || getZipCode() != seller.getZipCode()) {
                return false;
            }
            String state = getState();
            String state2 = seller.getState();
            if (state == null) {
                if (state2 != null) {
                    return false;
                }
            } else if (!state.equals(state2)) {
                return false;
            }
            String street = getStreet();
            String street2 = seller.getStreet();
            return street == null ? street2 == null : street.equals(street2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof Seller;
        }

        public int hashCode() {
            long zipCode = getZipCode();
            int i = (1 * 59) + ((int) ((zipCode >>> 32) ^ zipCode));
            String state = getState();
            int hashCode = (i * 59) + (state == null ? 43 : state.hashCode());
            String street = getStreet();
            return (hashCode * 59) + (street == null ? 43 : street.hashCode());
        }

        public String toString() {
            return "TopicsTest.Seller(state=" + getState() + ", street=" + getStreet() + ", zipCode=" + getZipCode() + ")";
        }

        public Seller(String str, String str2, long j) {
            this.state = str;
            this.street = str2;
            this.zipCode = j;
        }

        public Seller() {
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        this.topics = (Topics) Mockito.spy(new Topics());
        this.topics.setPulsar(this.pulsar);
        ((Topics) Mockito.doReturn(TopicDomain.persistent.value()).when(this.topics)).domain();
        ((Topics) Mockito.doReturn("test-app").when(this.topics)).clientAppId();
        ((Topics) Mockito.doReturn(Mockito.mock(AuthenticationDataHttps.class)).when(this.topics)).clientAuthData();
        this.admin.clusters().createCluster("test", new ClusterDataImpl());
        this.admin.tenants().createTenant("my-tenant", new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
        this.admin.namespaces().createNamespace("my-tenant/my-namespace", Set.of("test"));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testProduceToNonPartitionedTopic() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        StringSchema utf8 = StringSchema.utf8();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(utf8.getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(utf8.getSchemaInfo()));
        producerMessages.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.OK.getStatusCode());
        Object entity = ((Response) forClass.getValue()).getEntity();
        Assert.assertTrue(entity instanceof ProducerAcks);
        ProducerAcks producerAcks = (ProducerAcks) entity;
        Assert.assertEquals(producerAcks.getMessagePublishResults().size(), 3);
        Assert.assertEquals(producerAcks.getSchemaVersion(), 0L);
        for (int i = 0; i < producerAcks.getMessagePublishResults().size(); i++) {
            Assert.assertEquals(Integer.parseInt(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().split(":")[2]), -1);
            Assert.assertEquals(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getErrorCode(), 0);
            Assert.assertTrue(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().length() > 0);
        }
    }

    private static List<ProducerMessage> createMessages(String str) throws JsonProcessingException {
        return (List) ObjectMapperFactory.getMapper().reader().forType(new TypeReference<List<ProducerMessage>>() { // from class: org.apache.pulsar.broker.admin.TopicsTest.1
        }).readValue(str);
    }

    @Test
    public void testProduceToPartitionedTopic() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/my-topic-p", 5);
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        StringSchema utf8 = StringSchema.utf8();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(utf8.getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(utf8.getSchemaInfo()));
        producerMessages.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5},{\"key\":\"my-key\",\"payload\":\"RestProducer:6\",\"eventTime\":1603045262772,\"sequenceId\":6},{\"key\":\"my-key\",\"payload\":\"RestProducer:7\",\"eventTime\":1603045262772,\"sequenceId\":7},{\"key\":\"my-key\",\"payload\":\"RestProducer:8\",\"eventTime\":1603045262772,\"sequenceId\":8},{\"key\":\"my-key\",\"payload\":\"RestProducer:9\",\"eventTime\":1603045262772,\"sequenceId\":9},{\"key\":\"my-key\",\"payload\":\"RestProducer:10\",\"eventTime\":1603045262772,\"sequenceId\":10}]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic-p", false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.OK.getStatusCode());
        Object entity = ((Response) forClass.getValue()).getEntity();
        Assert.assertTrue(entity instanceof ProducerAcks);
        ProducerAcks producerAcks = (ProducerAcks) entity;
        Assert.assertEquals(producerAcks.getMessagePublishResults().size(), 10);
        Assert.assertEquals(producerAcks.getSchemaVersion(), 0L);
        int[] iArr = new int[5];
        for (int i = 0; i < producerAcks.getMessagePublishResults().size(); i++) {
            int parseInt = Integer.parseInt(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().split(":")[2]);
            iArr[parseInt] = iArr[parseInt] + 1;
            Assert.assertEquals(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getErrorCode(), 0);
            Assert.assertTrue(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().length() > 0);
        }
        for (int i2 : iArr) {
            Assert.assertTrue(i2 <= 2);
        }
    }

    @Test
    public void testProduceToPartitionedTopicSpecificPartition() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-tenant/my-namespace/my-topic", 5);
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        StringSchema utf8 = StringSchema.utf8();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(utf8.getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(utf8.getSchemaInfo()));
        producerMessages.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4}]"));
        this.topics.produceOnPersistentTopicPartition(asyncResponse, "my-tenant", "my-namespace", "my-topic", 2, false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.OK.getStatusCode());
        Object entity = ((Response) forClass.getValue()).getEntity();
        Assert.assertTrue(entity instanceof ProducerAcks);
        ProducerAcks producerAcks = (ProducerAcks) entity;
        Assert.assertEquals(producerAcks.getMessagePublishResults().size(), 4);
        Assert.assertEquals(producerAcks.getSchemaVersion(), 0L);
        for (int i = 0; i < producerAcks.getMessagePublishResults().size(); i++) {
            Assert.assertEquals(Integer.parseInt(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().split(":")[2]), 2);
            Assert.assertEquals(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getErrorCode(), 0);
            Assert.assertTrue(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().length() > 0);
        }
    }

    @Test
    public void testProduceFailed() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        this.pulsar.getBrokerService().getTopic("persistent://my-tenant/my-namespace/my-topic", false).thenAccept(optional -> {
            try {
                PersistentTopic persistentTopic = (PersistentTopic) Mockito.spy((PersistentTopic) optional.get());
                final AtomicInteger atomicInteger = new AtomicInteger();
                ((PersistentTopic) Mockito.doAnswer(new Answer() { // from class: org.apache.pulsar.broker.admin.TopicsTest.2
                    public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                        Topic.PublishContext publishContext = (Topic.PublishContext) invocationOnMock.getArgument(1);
                        if (atomicInteger.getAndIncrement() < 2) {
                            publishContext.completed((Exception) null, -1L, -1L);
                            return null;
                        }
                        publishContext.completed(new BrokerServiceException.TopicFencedException("Fake exception"), -1L, -1L);
                        return null;
                    }
                }).when(persistentTopic)).publishMessage((ByteBuf) ArgumentMatchers.any(), (Topic.PublishContext) ArgumentMatchers.any());
                BrokerService brokerService = (BrokerService) Mockito.spy(this.pulsar.getBrokerService());
                ((BrokerService) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(persistentTopic))).when(brokerService)).getTopic(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean());
                ((PulsarService) Mockito.doReturn(brokerService).when(this.pulsar)).getBrokerService();
                AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
                StringSchema utf8 = StringSchema.utf8();
                ProducerMessages producerMessages = new ProducerMessages();
                producerMessages.setKeySchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(utf8.getSchemaInfo()));
                producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(utf8.getSchemaInfo()));
                producerMessages.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4}]"));
                this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
                ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
                ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
                Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.OK.getStatusCode());
                Object entity = ((Response) forClass.getValue()).getEntity();
                Assert.assertTrue(entity instanceof ProducerAcks);
                ProducerAcks producerAcks = (ProducerAcks) entity;
                Assert.assertEquals(producerAcks.getMessagePublishResults().size(), 4);
                int i = 0;
                for (int i2 = 0; i2 < producerAcks.getMessagePublishResults().size(); i2++) {
                    int errorCode = ((ProducerAck) producerAcks.getMessagePublishResults().get(i2)).getErrorCode();
                    if (0 == errorCode) {
                        Assert.assertEquals(Integer.parseInt(((ProducerAck) producerAcks.getMessagePublishResults().get(i2)).getMessageId().split(":")[2]), -1);
                        Assert.assertTrue(((ProducerAck) producerAcks.getMessagePublishResults().get(i2)).getMessageId().length() > 0);
                    } else {
                        i++;
                        Assert.assertEquals(errorCode, 2);
                        Assert.assertEquals(((ProducerAck) producerAcks.getMessagePublishResults().get(i2)).getErrorMsg(), "org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Fake exception");
                    }
                }
                Assert.assertTrue(i == 2);
            } catch (Throwable th) {
                Assert.fail(th.getMessage());
            }
        }).get();
    }

    @Test
    public void testLookUpWithRedirect() throws Exception {
        URI create = URI.create(this.pulsar.getWebServiceAddress() + "/topics/my-tenant/my-namespace/my-topic");
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        this.conf.setBrokerServicePort(Optional.of(0));
        this.conf.setBrokerServicePortTls(Optional.of(0));
        this.conf.setWebServicePort(Optional.of(0));
        this.conf.setWebServicePortTls(Optional.of(0));
        this.conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
        this.conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
        this.conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
        PulsarTestContext createAdditionalPulsarTestContext = createAdditionalPulsarTestContext(this.conf);
        try {
            PulsarService pulsarService = createAdditionalPulsarTestContext.getPulsarService();
            ((Topics) Mockito.doReturn(false).when(this.topics)).isRequestHttps();
            UriInfo uriInfo = (UriInfo) Mockito.mock(UriInfo.class);
            ((UriInfo) Mockito.doReturn(create).when(uriInfo)).getRequestUri();
            FieldUtils.writeField(this.topics, "uri", uriInfo, true);
            this.topics.setPulsar(pulsarService);
            AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
            ProducerMessages producerMessages = new ProducerMessages();
            producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(Schema.INT64.getSchemaInfo()));
            producerMessages.setMessages(createMessages("[]"));
            this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
            ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
            Assert.assertEquals(((Response) forClass.getValue()).getStatusInfo(), Response.Status.TEMPORARY_REDIRECT);
            Assert.assertEquals(((Response) forClass.getValue()).getLocation().toString(), create.toString());
            if (Collections.singletonList(createAdditionalPulsarTestContext).get(0) != null) {
                createAdditionalPulsarTestContext.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(createAdditionalPulsarTestContext).get(0) != null) {
                createAdditionalPulsarTestContext.close();
            }
            throw th;
        }
    }

    @Test
    public void testLookUpWithException() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        NamespaceService namespaceService = (NamespaceService) Mockito.mock(NamespaceService.class);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new BrokerServiceException("Fake Exception"));
        CompletableFuture completableFuture2 = new CompletableFuture();
        completableFuture2.complete(TopicExistsInfo.newNonPartitionedTopicExists());
        ((NamespaceService) Mockito.doReturn(completableFuture).when(namespaceService)).getBrokerServiceUrlAsync((TopicName) ArgumentMatchers.any(), (LookupOptions) ArgumentMatchers.any());
        ((NamespaceService) Mockito.doReturn(completableFuture2).when(namespaceService)).checkTopicExists((TopicName) ArgumentMatchers.any());
        CompletableFuture completableFuture3 = new CompletableFuture();
        completableFuture3.complete(false);
        ((NamespaceService) Mockito.doReturn(completableFuture3).when(namespaceService)).checkNonPartitionedTopicExists((TopicName) ArgumentMatchers.any());
        ((PulsarService) Mockito.doReturn(namespaceService).when(this.pulsar)).getNamespaceService();
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(Schema.INT64.getSchemaInfo()));
        producerMessages.setMessages(createMessages("[]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume((Throwable) forClass.capture());
        Assert.assertTrue(((RestException) forClass.getValue()).getMessage().contains("persistent://my-tenant/my-namespace/my-topic" + " not found"));
    }

    @Test
    public void testLookUpTopicNotExist() throws Exception {
        NamespaceService namespaceService = (NamespaceService) Mockito.mock(NamespaceService.class);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(TopicExistsInfo.newTopicNotExists());
        CompletableFuture completableFuture2 = new CompletableFuture();
        completableFuture2.complete(false);
        ((NamespaceService) Mockito.doReturn(completableFuture).when(namespaceService)).checkTopicExists((TopicName) ArgumentMatchers.any());
        ((NamespaceService) Mockito.doReturn(completableFuture2).when(namespaceService)).checkNonPartitionedTopicExists((TopicName) ArgumentMatchers.any());
        ((PulsarService) Mockito.doReturn(namespaceService).when(this.pulsar)).getNamespaceService();
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(Schema.INT64.getSchemaInfo()));
        producerMessages.setMessages(createMessages("[]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume((Throwable) forClass.capture());
        System.out.println(((RestException) forClass.getValue()).getMessage());
        Assert.assertTrue(((RestException) forClass.getValue()).getMessage().contains(String.format("Topic %s not found", "persistent://my-tenant/my-namespace/my-topic")));
    }

    @Test
    public void testProduceWithLongSchema() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT64).topic(new String[]{"persistent://my-tenant/my-namespace/my-topic"}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(Schema.INT64.getSchemaInfo()));
        producerMessages.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"111111111111\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"222222222222\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"333333333333\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"444444444444\",\"eventTime\":1603045262772,\"sequenceId\":4},{\"key\":\"my-key\",\"payload\":\"555555555555\",\"eventTime\":1603045262772,\"sequenceId\":5}]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.OK.getStatusCode());
        Object entity = ((Response) forClass.getValue()).getEntity();
        Assert.assertTrue(entity instanceof ProducerAcks);
        ProducerAcks producerAcks = (ProducerAcks) entity;
        Assert.assertEquals(producerAcks.getMessagePublishResults().size(), 5);
        Assert.assertEquals(producerAcks.getSchemaVersion(), 0L);
        for (int i = 0; i < producerAcks.getMessagePublishResults().size(); i++) {
            Assert.assertEquals(Integer.parseInt(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().split(":")[2]), -1);
            Assert.assertEquals(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getErrorCode(), 0);
            Assert.assertTrue(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().length() > 0);
        }
        List asList = Arrays.asList(111111111111L, 222222222222L, 333333333333L, 444444444444L, 555555555555L);
        for (int i2 = 0; i2 < 5; i2++) {
            Message receive = subscribe.receive(2, TimeUnit.SECONDS);
            Assert.assertEquals((Long) asList.get(i2), (Long) Schema.INT64.decode(receive.getData()));
            Assert.assertEquals("my-key", receive.getKey());
        }
    }

    @Test
    public void testProduceNoSchema() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        Consumer subscribe = this.pulsarClient.newConsumer(StringSchema.utf8()).topic(new String[]{"persistent://my-tenant/my-namespace/my-topic"}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5}]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.OK.getStatusCode());
        Object entity = ((Response) forClass.getValue()).getEntity();
        Assert.assertTrue(entity instanceof ProducerAcks);
        ProducerAcks producerAcks = (ProducerAcks) entity;
        Assert.assertEquals(producerAcks.getMessagePublishResults().size(), 5);
        Assert.assertEquals(producerAcks.getSchemaVersion(), 0L);
        for (int i = 0; i < producerAcks.getMessagePublishResults().size(); i++) {
            Assert.assertEquals(Integer.parseInt(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().split(":")[2]), -1);
            Assert.assertEquals(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getErrorCode(), 0);
            Assert.assertTrue(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().length() > 0);
        }
        List asList = Arrays.asList("RestProducer:1", "RestProducer:2", "RestProducer:3", "RestProducer:4", "RestProducer:5");
        for (int i2 = 0; i2 < 5; i2++) {
            Message receive = subscribe.receive(2, TimeUnit.SECONDS);
            Assert.assertEquals((String) asList.get(i2), StringSchema.utf8().decode(receive.getData()));
            Assert.assertEquals("my-key", receive.getKey());
        }
    }

    @Test
    public void testProduceWithJsonSchema() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        GenericSchemaImpl of = GenericJsonSchema.of(JSONSchema.of(SchemaDefinition.builder().withPojo(PC.class).build()).getSchemaInfo());
        PC pc = new PC("dell", "alienware", 2021, GPU.AMD, new Seller("WA", "main street", 98004L));
        PC pc2 = new PC("asus", "rog", 2020, GPU.NVIDIA, new Seller("CA", "back street", 90232L));
        Consumer subscribe = this.pulsarClient.newConsumer(of).topic(new String[]{"persistent://my-tenant/my-namespace/my-topic"}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(of.getSchemaInfo()));
        producerMessages.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"" + ObjectMapperFactory.getMapper().writer().writeValueAsString(pc).replace("\"", "\\\"") + "\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"" + ObjectMapperFactory.getMapper().writer().writeValueAsString(pc2).replace("\"", "\\\"") + "\",\"eventTime\":1603045262772,\"sequenceId\":2}]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.OK.getStatusCode());
        Object entity = ((Response) forClass.getValue()).getEntity();
        Assert.assertTrue(entity instanceof ProducerAcks);
        ProducerAcks producerAcks = (ProducerAcks) entity;
        Assert.assertEquals(producerAcks.getMessagePublishResults().size(), 2);
        Assert.assertEquals(producerAcks.getSchemaVersion(), 0L);
        for (int i = 0; i < producerAcks.getMessagePublishResults().size(); i++) {
            Assert.assertEquals(Integer.parseInt(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().split(":")[2]), -1);
            Assert.assertEquals(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getErrorCode(), 0);
            Assert.assertTrue(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().length() > 0);
        }
        List asList = Arrays.asList(pc, pc2);
        for (int i2 = 0; i2 < 2; i2++) {
            Message receive = subscribe.receive(2, TimeUnit.SECONDS);
            PC pc3 = (PC) ObjectMapperFactory.getMapper().getObjectMapper().treeToValue(((GenericJsonRecord) of.decode(receive.getData())).getJsonNode(), PC.class);
            Assert.assertEquals(pc3.brand, ((PC) asList.get(i2)).brand);
            Assert.assertEquals(pc3.model, ((PC) asList.get(i2)).model);
            Assert.assertEquals(pc3.year, ((PC) asList.get(i2)).year);
            Assert.assertEquals(pc3.gpu, ((PC) asList.get(i2)).gpu);
            Assert.assertEquals(pc3.seller.state, ((PC) asList.get(i2)).seller.state);
            Assert.assertEquals(pc3.seller.street, ((PC) asList.get(i2)).seller.street);
            Assert.assertEquals(pc3.seller.zipCode, ((PC) asList.get(i2)).seller.zipCode);
            Assert.assertEquals("my-key", receive.getKey());
        }
    }

    @Test
    public void testProduceWithAvroSchema() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        GenericSchemaImpl of = GenericAvroSchema.of(AvroSchema.of(SchemaDefinition.builder().withPojo(PC.class).build()).getSchemaInfo());
        PC pc = new PC("dell", "alienware", 2021, GPU.AMD, new Seller("WA", "main street", 98004L));
        PC pc2 = new PC("asus", "rog", 2020, GPU.NVIDIA, new Seller("CA", "back street", 90232L));
        Consumer subscribe = this.pulsarClient.newConsumer(of).topic(new String[]{"persistent://my-tenant/my-namespace/my-topic"}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(of.getSchemaInfo()));
        ReflectDatumWriter reflectDatumWriter = new ReflectDatumWriter(of.getAvroSchema());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(of.getAvroSchema(), byteArrayOutputStream);
        JsonEncoder jsonEncoder2 = EncoderFactory.get().jsonEncoder(of.getAvroSchema(), byteArrayOutputStream2);
        reflectDatumWriter.write(pc, jsonEncoder);
        jsonEncoder.flush();
        reflectDatumWriter.write(pc2, jsonEncoder2);
        jsonEncoder2.flush();
        producerMessages.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"" + byteArrayOutputStream.toString().replace("\"", "\\\"") + "\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"" + byteArrayOutputStream2.toString().replace("\"", "\\\"") + "\",\"eventTime\":1603045262772,\"sequenceId\":2}]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.OK.getStatusCode());
        Object entity = ((Response) forClass.getValue()).getEntity();
        Assert.assertTrue(entity instanceof ProducerAcks);
        ProducerAcks producerAcks = (ProducerAcks) entity;
        Assert.assertEquals(producerAcks.getMessagePublishResults().size(), 2);
        Assert.assertEquals(producerAcks.getSchemaVersion(), 0L);
        for (int i = 0; i < producerAcks.getMessagePublishResults().size(); i++) {
            Assert.assertEquals(Integer.parseInt(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().split(":")[2]), -1);
            Assert.assertEquals(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getErrorCode(), 0);
            Assert.assertTrue(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().length() > 0);
        }
        List asList = Arrays.asList(pc, pc2);
        for (int i2 = 0; i2 < 2; i2++) {
            Message receive = subscribe.receive(2, TimeUnit.SECONDS);
            GenericAvroRecord genericAvroRecord = (GenericAvroRecord) of.decode(receive.getData());
            Assert.assertEquals(((Utf8) genericAvroRecord.getAvroRecord().get("brand")).toString(), ((PC) asList.get(i2)).brand);
            Assert.assertEquals(((Utf8) genericAvroRecord.getAvroRecord().get("model")).toString(), ((PC) asList.get(i2)).model);
            Assert.assertEquals(((Integer) genericAvroRecord.getAvroRecord().get("year")).intValue(), ((PC) asList.get(i2)).year);
            Assert.assertEquals(((GenericData.EnumSymbol) genericAvroRecord.getAvroRecord().get("gpu")).toString(), ((PC) asList.get(i2)).gpu.toString());
            Assert.assertEquals(((Utf8) ((GenericRecord) genericAvroRecord.getAvroRecord().get("seller")).get("state")).toString(), ((PC) asList.get(i2)).seller.state);
            Assert.assertEquals(((Utf8) ((GenericRecord) genericAvroRecord.getAvroRecord().get("seller")).get("street")).toString(), ((PC) asList.get(i2)).seller.street);
            Assert.assertEquals(((GenericRecord) genericAvroRecord.getAvroRecord().get("seller")).get("zipCode"), Long.valueOf(((PC) asList.get(i2)).seller.zipCode));
            Assert.assertEquals("my-key", receive.getKey());
        }
    }

    @Test
    public void testProduceWithRestAndClientThenConsumeWithClient() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        Schema of = KeyValueSchemaImpl.of(StringSchema.utf8(), StringSchema.utf8(), KeyValueEncodingType.SEPARATED);
        Producer create = this.pulsarClient.newProducer(of).topic("persistent://my-tenant/my-namespace/my-topic").create();
        Consumer subscribe = this.pulsarClient.newConsumer(of).topic(new String[]{"persistent://my-tenant/my-namespace/my-topic"}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        for (int i = 0; i < 3; i++) {
            create.newMessage(of).value(new KeyValue("my-key", "ClientProducer:" + i)).send();
        }
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(StringSchema.utf8().getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(StringSchema.utf8().getSchemaInfo()));
        producerMessages.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.OK.getStatusCode());
        Object entity = ((Response) forClass.getValue()).getEntity();
        Assert.assertTrue(entity instanceof ProducerAcks);
        ProducerAcks producerAcks = (ProducerAcks) entity;
        Assert.assertEquals(producerAcks.getMessagePublishResults().size(), 3);
        Assert.assertEquals(producerAcks.getSchemaVersion(), 0L);
        for (int i2 = 0; i2 < producerAcks.getMessagePublishResults().size(); i2++) {
            Assert.assertEquals(Integer.parseInt(((ProducerAck) producerAcks.getMessagePublishResults().get(i2)).getMessageId().split(":")[2]), -1);
            Assert.assertEquals(((ProducerAck) producerAcks.getMessagePublishResults().get(i2)).getErrorCode(), 0);
            Assert.assertTrue(((ProducerAck) producerAcks.getMessagePublishResults().get(i2)).getMessageId().length() > 0);
        }
        List asList = Arrays.asList("ClientProducer:0", "ClientProducer:1", "ClientProducer:2", "RestProducer:1", "RestProducer:2", "RestProducer:3");
        for (int i3 = 0; i3 < 6; i3++) {
            Message receive = subscribe.receive(2, TimeUnit.SECONDS);
            Assert.assertEquals((String) asList.get(i3), StringSchema.utf8().decode(receive.getData()));
            Assert.assertEquals("bXkta2V5", receive.getKey());
        }
    }

    @Test
    public void testProduceWithRestThenConsumeWithClient() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        Consumer subscribe = this.pulsarClient.newConsumer(KeyValueSchemaImpl.of(StringSchema.utf8(), StringSchema.utf8(), KeyValueEncodingType.SEPARATED)).topic(new String[]{"persistent://my-tenant/my-namespace/my-topic"}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(StringSchema.utf8().getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(StringSchema.utf8().getSchemaInfo()));
        producerMessages.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5}]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.OK.getStatusCode());
        Object entity = ((Response) forClass.getValue()).getEntity();
        Assert.assertTrue(entity instanceof ProducerAcks);
        ProducerAcks producerAcks = (ProducerAcks) entity;
        Assert.assertEquals(producerAcks.getMessagePublishResults().size(), 5);
        Assert.assertEquals(producerAcks.getSchemaVersion(), 0L);
        for (int i = 0; i < producerAcks.getMessagePublishResults().size(); i++) {
            Assert.assertEquals(Integer.parseInt(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().split(":")[2]), -1);
            Assert.assertEquals(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getErrorCode(), 0);
            Assert.assertTrue(((ProducerAck) producerAcks.getMessagePublishResults().get(i)).getMessageId().length() > 0);
        }
        ProducerMessages producerMessages2 = new ProducerMessages();
        producerMessages2.setSchemaVersion(producerAcks.getSchemaVersion());
        producerMessages2.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"RestProducer:6\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:7\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:8\",\"eventTime\":1603045262772,\"sequenceId\":3},{\"key\":\"my-key\",\"payload\":\"RestProducer:9\",\"eventTime\":1603045262772,\"sequenceId\":4},{\"key\":\"my-key\",\"payload\":\"RestProducer:10\",\"eventTime\":1603045262772,\"sequenceId\":5}]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages2);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume(forClass.capture());
        Assert.assertEquals(((Response) forClass.getValue()).getStatus(), Response.Status.OK.getStatusCode());
        Object entity2 = ((Response) forClass.getValue()).getEntity();
        Assert.assertTrue(entity2 instanceof ProducerAcks);
        ProducerAcks producerAcks2 = (ProducerAcks) entity2;
        Assert.assertEquals(producerAcks2.getMessagePublishResults().size(), 5);
        Assert.assertEquals(producerAcks2.getSchemaVersion(), 0L);
        for (int i2 = 0; i2 < producerAcks2.getMessagePublishResults().size(); i2++) {
            Assert.assertEquals(Integer.parseInt(((ProducerAck) producerAcks2.getMessagePublishResults().get(i2)).getMessageId().split(":")[2]), -1);
            Assert.assertEquals(((ProducerAck) producerAcks2.getMessagePublishResults().get(i2)).getErrorCode(), 0);
            Assert.assertTrue(((ProducerAck) producerAcks2.getMessagePublishResults().get(i2)).getMessageId().length() > 0);
        }
        List asList = Arrays.asList("RestProducer:1", "RestProducer:2", "RestProducer:3", "RestProducer:4", "RestProducer:5", "RestProducer:6", "RestProducer:7", "RestProducer:8", "RestProducer:9", "RestProducer:10");
        for (int i3 = 0; i3 < 10; i3++) {
            Message receive = subscribe.receive(2, TimeUnit.SECONDS);
            Assert.assertEquals((String) asList.get(i3), StringSchema.utf8().decode(receive.getData()));
            Assert.assertEquals("bXkta2V5", receive.getKey());
        }
    }

    @Test
    public void testProduceWithInCompatibleSchema() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-tenant/my-namespace/my-topic");
        AsyncResponse asyncResponse = (AsyncResponse) Mockito.mock(AsyncResponse.class);
        Producer create = this.pulsarClient.newProducer(StringSchema.utf8()).topic("persistent://my-tenant/my-namespace/my-topic").create();
        for (int i = 0; i < 3; i++) {
            create.send("message");
        }
        ProducerMessages producerMessages = new ProducerMessages();
        producerMessages.setKeySchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(StringSchema.utf8().getSchemaInfo()));
        producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(StringSchema.utf8().getSchemaInfo()));
        producerMessages.setMessages(createMessages("[{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]"));
        this.topics.produceOnPersistentTopic(asyncResponse, "my-tenant", "my-namespace", "my-topic", false, producerMessages);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(RestException.class);
        ((AsyncResponse) Mockito.verify(asyncResponse, Mockito.timeout(5000L).times(1))).resume((Throwable) forClass.capture());
        Assert.assertTrue(((RestException) forClass.getValue()).getMessage().startsWith("Fail to publish message:java.util.concurrent.ExecutionException: org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Unable to add schema SchemaData(type=KEY_VALUE, isDeleted=false, timestamp="));
        Assert.assertTrue(((RestException) forClass.getValue()).getMessage().endsWith("user=Rest Producer, data=[0, 0, 0, 0, 0, 0, 0, 0], props={key.schema.properties={\"__charset\":\"UTF-8\"}, value.schema.properties={\"__charset\":\"UTF-8\"}, value.schema.type=STRING, key.schema.name=String, value.schema.name=String, kv.encoding.type=SEPARATED, key.schema.type=STRING}) to topic persistent://my-tenant/my-namespace/my-topic"));
    }
}
