/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.schema;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.schema.Schemas;
import org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.internal.junit.ArrayAsserts;

@Test(groups={"schema"})
public class SchemaTest
extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(SchemaTest.class);
    private static final String CLUSTER_NAME = "test";

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster(CLUSTER_NAME, new ClusterData(this.pulsar.getBrokerServiceUrl()));
        TenantInfo tenantInfo = new TenantInfo();
        tenantInfo.setAllowedClusters(Collections.singleton(CLUSTER_NAME));
        this.admin.tenants().createTenant("public", tenantInfo);
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testMultiTopicSetSchemaProvider() throws Exception {
        String tenant = "public";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicOne = "test-multi-version-schema-one";
        String topicTwo = "test-multi-version-schema-two";
        String fqtnOne = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-multi-version-schema-one").toString();
        String fqtnTwo = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-multi-version-schema-two").toString();
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.topics().createPartitionedTopic(fqtnOne, 3);
        this.admin.topics().createPartitionedTopic(fqtnTwo, 3);
        this.admin.schemas().createSchema(fqtnOne, Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonOne.class).build()).getSchemaInfo());
        this.admin.schemas().createSchema(fqtnOne, Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
        Schema personTwoSchema = Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build());
        this.admin.schemas().createSchema(fqtnTwo, personTwoSchema.getSchemaInfo());
        Producer producer = this.pulsarClient.newProducer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).topic(fqtnOne).create();
        Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
        personTwo.setId(1);
        personTwo.setName("Tom");
        Consumer consumer = this.pulsarClient.newConsumer(Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).subscriptionName(CLUSTER_NAME).topic(new String[]{fqtnOne, fqtnTwo}).subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("test2").topic(new String[]{fqtnOne, fqtnTwo}).subscribe();
        producer.send((Object)personTwo);
        Message message = consumer.receive();
        Schemas.PersonTwo personConsume = (Schemas.PersonTwo)message.getValue();
        Assert.assertEquals((String)personConsume.getName(), (String)"Tom");
        Assert.assertEquals((int)personConsume.getId(), (int)1);
        Schema schema = (Schema)message.getReaderSchema().get();
        log.info("the-schema {}", (Object)schema);
        Assert.assertEquals((Object)personTwoSchema.getSchemaInfo(), (Object)schema.getSchemaInfo());
        org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema)schema.getNativeSchema().get();
        log.info("nativeSchema-schema {}", (Object)nativeSchema);
        Assert.assertNotNull((Object)nativeSchema);
        Message message2 = consumer2.receive();
        Schema schema2 = (Schema)message2.getReaderSchema().get();
        log.info("the-schema {}", (Object)schema2);
        Assert.assertEquals((Object)personTwoSchema.getSchemaInfo(), (Object)schema2.getSchemaInfo());
        org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema)schema.getNativeSchema().get();
        log.info("nativeSchema-schema {}", (Object)nativeSchema2);
        Assert.assertNotNull((Object)nativeSchema2);
        producer.close();
        consumer.close();
    }

    @Test
    public void testMultiTopicSetSchemaProviderWithKeyValue() throws Exception {
        String tenant = "public";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicOne = "test-multi-version-schema-one";
        String topicTwo = "test-multi-version-schema-two";
        String fqtnOne = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-multi-version-schema-one").toString();
        String fqtnTwo = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-multi-version-schema-two").toString();
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.topics().createPartitionedTopic(fqtnOne, 3);
        this.admin.topics().createPartitionedTopic(fqtnTwo, 3);
        Schema schemaOne = Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonOne.class).build());
        this.admin.schemas().createSchema(fqtnOne, Schema.KeyValue((Schema)Schema.STRING, (Schema)schemaOne).getSchemaInfo());
        Schema schemaTwo = Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build());
        this.admin.schemas().createSchema(fqtnOne, Schema.KeyValue((Schema)Schema.STRING, (Schema)schemaTwo).getSchemaInfo());
        Schema personTwoSchema = Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build());
        this.admin.schemas().createSchema(fqtnTwo, Schema.KeyValue((Schema)Schema.STRING, (Schema)schemaTwo).getSchemaInfo());
        Producer producer = this.pulsarClient.newProducer(Schema.KeyValue((Schema)Schema.STRING, (Schema)Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))).topic(fqtnOne).create();
        Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
        personTwo.setId(1);
        personTwo.setName("Tom");
        Consumer consumer = this.pulsarClient.newConsumer(Schema.KeyValue((Schema)Schema.STRING, (Schema)Schema.AVRO((SchemaDefinition)SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))).subscriptionName(CLUSTER_NAME).topic(new String[]{fqtnOne, fqtnTwo}).subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("test2").topic(new String[]{fqtnOne, fqtnTwo}).subscribe();
        producer.send((Object)new KeyValue((Object)"foo", (Object)personTwo));
        Message message = consumer.receive();
        Assert.assertEquals((String)"foo", (String)((String)((KeyValue)message.getValue()).getKey()));
        Schemas.PersonTwo personConsume = (Schemas.PersonTwo)((KeyValue)message.getValue()).getValue();
        Assert.assertEquals((String)personConsume.getName(), (String)"Tom");
        Assert.assertEquals((int)personConsume.getId(), (int)1);
        KeyValueSchema schema = (KeyValueSchema)message.getReaderSchema().get();
        log.info("the-schema {}", (Object)schema);
        Assert.assertEquals((Object)personTwoSchema.getSchemaInfo(), (Object)schema.getValueSchema().getSchemaInfo());
        org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema)schema.getValueSchema().getNativeSchema().get();
        log.info("nativeSchema-schema {}", (Object)nativeSchema);
        Assert.assertNotNull((Object)nativeSchema);
        Message message2 = consumer2.receive();
        KeyValueSchema schema2 = (KeyValueSchema)message2.getReaderSchema().get();
        log.info("the-schema {}", (Object)schema2);
        Assert.assertEquals((Object)personTwoSchema.getSchemaInfo(), (Object)schema2.getValueSchema().getSchemaInfo());
        org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema)schema.getValueSchema().getNativeSchema().get();
        log.info("nativeSchema-schema {}", (Object)nativeSchema2);
        Assert.assertNotNull((Object)nativeSchema2);
        producer.close();
        consumer.close();
    }

    @Test
    public void testJSONSchemaDeserialize() throws Exception {
        String tenant = "public";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicName = "test-bytes-schema";
        String topic = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-bytes-schema").toString();
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.topics().createPartitionedTopic(topic, 2);
        this.admin.schemas().createSchema(topic, Schema.JSON(Schemas.BytesRecord.class).getSchemaInfo());
        Producer producer = this.pulsarClient.newProducer(Schema.JSON(Schemas.BytesRecord.class)).topic(topic).create();
        Schemas.BytesRecord bytesRecord = new Schemas.BytesRecord();
        bytesRecord.setId(1);
        bytesRecord.setName("Tom");
        bytesRecord.setAddress(CLUSTER_NAME.getBytes());
        Consumer consumer = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("test-sub").topic(new String[]{topic}).subscribe();
        Consumer consumer1 = this.pulsarClient.newConsumer(Schema.JSON(Schemas.BytesRecord.class)).subscriptionName("test-sub1").topic(new String[]{topic}).subscribe();
        producer.send((Object)bytesRecord);
        Message message = consumer.receive();
        Message message1 = consumer1.receive();
        Assert.assertEquals(((org.apache.pulsar.client.api.schema.GenericRecord)message.getValue()).getField("address").getClass(), ((Schemas.BytesRecord)message1.getValue()).getAddress().getClass());
        Schema schema = (Schema)message.getReaderSchema().get();
        Schema schema1 = (Schema)message1.getReaderSchema().get();
        log.info("schema {}", (Object)schema);
        log.info("schema1 {}", (Object)schema1);
        Assert.assertEquals((Object)schema.getSchemaInfo(), (Object)schema1.getSchemaInfo());
        producer.close();
        consumer.close();
        consumer1.close();
    }

    @Test
    public void testStringSchema() throws Exception {
        String tenant = "public";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicName = "test-string-schema";
        String topic = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-string-schema").toString();
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.topics().createPartitionedTopic(topic, 2);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("test-sub").topic(new String[]{topic}).subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("test-sub3").topic(new String[]{topic}).subscribe();
        producer.send((Object)"foo");
        Message message = consumer.receive();
        Message message2 = consumer2.receive();
        Assert.assertEquals((Object)SchemaType.STRING, (Object)((Schema)message.getReaderSchema().get()).getSchemaInfo().getType());
        Assert.assertEquals((Object)SchemaType.STRING, (Object)((Schema)message2.getReaderSchema().get()).getSchemaInfo().getType());
        Assert.assertEquals((String)"foo", (String)((String)message.getValue()));
        Assert.assertEquals((String)((org.apache.pulsar.client.api.schema.GenericRecord)message2.getValue()).getClass().getName(), (String)"org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
        Assert.assertEquals((Object)SchemaType.STRING, (Object)((org.apache.pulsar.client.api.schema.GenericRecord)message2.getValue()).getSchemaType());
        Assert.assertEquals((Object)"foo", (Object)((org.apache.pulsar.client.api.schema.GenericRecord)message2.getValue()).getNativeObject());
        producer.close();
        consumer.close();
        consumer2.close();
    }

    @Test
    public void testUseAutoConsumeWithBytesSchemaTopic() throws Exception {
        this.testUseAutoConsumeWithSchemalessTopic(SchemaType.BYTES);
    }

    @Test
    public void testUseAutoConsumeWithNoneSchemaTopic() throws Exception {
        this.testUseAutoConsumeWithSchemalessTopic(SchemaType.NONE);
    }

    private void testUseAutoConsumeWithSchemalessTopic(SchemaType schema) throws Exception {
        String tenant = "public";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicName = "test-schemaless";
        String topic = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-schemaless").toString();
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.topics().createPartitionedTopic(topic, 2);
        SchemaInfo schemaInfo = SchemaInfo.builder().schema(new byte[0]).name("dummySchema").type(schema).build();
        this.admin.schemas().createSchema(topic, schemaInfo);
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        Consumer consumer = this.pulsarClient.newConsumer().subscriptionName("test-sub").topic(new String[]{topic}).subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("test-sub3").topic(new String[]{topic}).subscribe();
        producer.send((Object)"foo".getBytes(StandardCharsets.UTF_8));
        Message message = consumer.receive();
        Message message2 = consumer2.receive();
        if (schema == SchemaType.BYTES) {
            Assert.assertEquals((Object)schema, (Object)((Schema)message.getReaderSchema().get()).getSchemaInfo().getType());
            Assert.assertEquals((Object)schema, (Object)((Schema)message2.getReaderSchema().get()).getSchemaInfo().getType());
        } else if (schema == SchemaType.NONE) {
            Assert.assertEquals((Object)SchemaType.BYTES, (Object)((Schema)message.getReaderSchema().get()).getSchemaInfo().getType());
            Assert.assertEquals((Object)SchemaType.BYTES, (Object)((Schema)message2.getReaderSchema().get()).getSchemaInfo().getType());
        } else {
            Assert.fail();
        }
        Assert.assertEquals((byte[])"foo".getBytes(StandardCharsets.UTF_8), (byte[])((byte[])message.getValue()));
        Assert.assertEquals((String)((org.apache.pulsar.client.api.schema.GenericRecord)message2.getValue()).getClass().getName(), (String)"org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
        Assert.assertEquals((Object)SchemaType.BYTES, (Object)((org.apache.pulsar.client.api.schema.GenericRecord)message2.getValue()).getSchemaType());
        Assert.assertEquals((Object)"foo".getBytes(StandardCharsets.UTF_8), (Object)((org.apache.pulsar.client.api.schema.GenericRecord)message2.getValue()).getNativeObject());
        producer.close();
        consumer.close();
        consumer2.close();
    }

    @Test
    public void testKeyValueSchemaINLINE() throws Exception {
        this.testKeyValueSchema(KeyValueEncodingType.INLINE);
    }

    @Test
    public void testKeyValueSchemaSEPARATED() throws Exception {
        this.testKeyValueSchema(KeyValueEncodingType.SEPARATED);
    }

    private void testKeyValueSchema(KeyValueEncodingType keyValueEncodingType) throws Exception {
        String tenant = "public";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicName = "test-kv-schema-" + SchemaCompatibilityCheckTest.randomName(16);
        String topic = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)topicName).toString();
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.topics().createPartitionedTopic(topic, 2);
        Producer producer = this.pulsarClient.newProducer(Schema.KeyValue((Schema)Schema.STRING, (Schema)Schema.INT32, (KeyValueEncodingType)keyValueEncodingType)).topic(topic).create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.KeyValue((Schema)Schema.STRING, (Schema)Schema.INT32, (KeyValueEncodingType)keyValueEncodingType)).subscriptionName("test-sub").topic(new String[]{topic}).subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("test-sub2").topic(new String[]{topic}).subscribe();
        producer.send((Object)new KeyValue((Object)"foo", (Object)123));
        Message message = consumer.receive();
        Message message2 = consumer2.receive();
        Assert.assertEquals((Object)message.getValue(), (Object)((org.apache.pulsar.client.api.schema.GenericRecord)message2.getValue()).getNativeObject());
        if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
            ArrayAsserts.assertArrayEquals((byte[])"foo".getBytes(StandardCharsets.UTF_8), (byte[])message.getKeyBytes());
            ArrayAsserts.assertArrayEquals((byte[])"foo".getBytes(StandardCharsets.UTF_8), (byte[])message2.getKeyBytes());
        } else {
            Assert.assertNull((Object)message.getKey());
            Assert.assertNull((Object)message2.getKey());
        }
        producer.close();
        consumer.close();
        consumer2.close();
    }

    @Test
    public void testKeyValueSchemaWithStructsINLINE() throws Exception {
        this.testKeyValueSchemaWithStructs(KeyValueEncodingType.INLINE);
    }

    @Test
    public void testKeyValueSchemaWithStructsSEPARATED() throws Exception {
        this.testKeyValueSchemaWithStructs(KeyValueEncodingType.SEPARATED);
    }

    private void testKeyValueSchemaWithStructs(KeyValueEncodingType keyValueEncodingType) throws Exception {
        String tenant = "public";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicName = "test-kv-schema-" + SchemaCompatibilityCheckTest.randomName(16);
        String topic = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)topicName).toString();
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.topics().createPartitionedTopic(topic, 2);
        Producer producer = this.pulsarClient.newProducer(Schema.KeyValue((Schema)Schema.AVRO(Schemas.PersonOne.class), (Schema)Schema.AVRO(Schemas.PersonTwo.class), (KeyValueEncodingType)keyValueEncodingType)).topic(topic).create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.KeyValue((Schema)Schema.AVRO(Schemas.PersonOne.class), (Schema)Schema.AVRO(Schemas.PersonTwo.class), (KeyValueEncodingType)keyValueEncodingType)).subscriptionName("test-sub").topic(new String[]{topic}).subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("test-sub2").topic(new String[]{topic}).subscribe();
        Schemas.PersonOne key = new Schemas.PersonOne(8787);
        Schemas.PersonTwo value = new Schemas.PersonTwo(323, "foo");
        producer.send((Object)new KeyValue((Object)key, (Object)value));
        Message message = consumer.receive();
        Message message2 = consumer2.receive();
        log.info("message: {}", message.getValue(), ((KeyValue)message.getValue()).getClass());
        log.info("message2: {}", ((org.apache.pulsar.client.api.schema.GenericRecord)message2.getValue()).getNativeObject(), ((org.apache.pulsar.client.api.schema.GenericRecord)message2.getValue()).getNativeObject().getClass());
        KeyValue keyValue2 = (KeyValue)((org.apache.pulsar.client.api.schema.GenericRecord)message2.getValue()).getNativeObject();
        Assert.assertEquals((Object)((Schemas.PersonOne)((KeyValue)message.getValue()).getKey()).id, (Object)((org.apache.pulsar.client.api.schema.GenericRecord)keyValue2.getKey()).getField("id"));
        Assert.assertEquals((Object)((Schemas.PersonTwo)((KeyValue)message.getValue()).getValue()).id, (Object)((org.apache.pulsar.client.api.schema.GenericRecord)keyValue2.getValue()).getField("id"));
        Assert.assertEquals((Object)((Schemas.PersonTwo)((KeyValue)message.getValue()).getValue()).name, (Object)((org.apache.pulsar.client.api.schema.GenericRecord)keyValue2.getValue()).getField("name"));
        Schema schema = (Schema)message.getReaderSchema().get();
        Schema schemaFromGenericRecord = (Schema)message.getReaderSchema().get();
        KeyValueSchema keyValueSchema = (KeyValueSchema)schema;
        KeyValueSchema keyValueSchemaFromGenericRecord = (KeyValueSchema)schemaFromGenericRecord;
        Assert.assertEquals((Object)keyValueSchema.getSchemaInfo(), (Object)keyValueSchemaFromGenericRecord.getSchemaInfo());
        if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
            Assert.assertNotNull((Object)message.getKeyBytes());
            Assert.assertNotNull((Object)message2.getKeyBytes());
        } else {
            Assert.assertNull((Object)message.getKey());
            Assert.assertNull((Object)message2.getKey());
        }
        producer.close();
        consumer.close();
        consumer2.close();
    }

    @Test
    public void testIsUsingAvroSchemaParser() {
        for (SchemaType value : SchemaType.values()) {
            if (value == SchemaType.AVRO || value == SchemaType.JSON || value == SchemaType.PROTOBUF) {
                Assert.assertTrue((boolean)SchemaRegistryServiceImpl.isUsingAvroSchemaParser((SchemaType)value));
                continue;
            }
            Assert.assertFalse((boolean)SchemaRegistryServiceImpl.isUsingAvroSchemaParser((SchemaType)value));
        }
    }

    @Test
    public void testNullKeyValueProperty() throws PulsarAdminException, PulsarClientException {
        String tenant = "public";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicName = CLUSTER_NAME;
        String topic = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)CLUSTER_NAME).toString();
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        HashMap<String, String> map = new HashMap<String, String>();
        map.put("key", null);
        map.put(null, "value");
        Schema.INT32.getSchemaInfo().setProperties(map);
        Consumer consumer = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{topic}).subscriptionName("sub").subscribe();
        consumer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeleteTopicAndSchema() throws Exception {
        String tenant = "public";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicName = "test-delete-topic-and-schema";
        String topic = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"test-delete-topic-and-schema").toString();
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        Producer p1 = this.pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class)).topic(topic).create();
        try {
            Producer p2 = this.pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class)).topic(topic).create();
            try {
                List schemaFutures = (List)this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get((String)topic).getSchemaName()).get();
                FutureUtil.waitForAll((List)schemaFutures).get();
                List schemas = schemaFutures.stream().map(future -> {
                    try {
                        return (SchemaRegistry.SchemaAndMetadata)future.get();
                    }
                    catch (Exception e) {
                        return null;
                    }
                }).collect(Collectors.toList());
                Assert.assertEquals((int)schemas.size(), (int)2);
                for (SchemaRegistry.SchemaAndMetadata schema : schemas) {
                    Assert.assertNotNull((Object)schema);
                }
                List ledgers = ((BookkeeperSchemaStorage)this.getPulsar().getSchemaStorage()).getSchemaLedgerList(TopicName.get((String)topic).getSchemaName());
                Assert.assertEquals((int)ledgers.size(), (int)2);
                this.admin.topics().delete(topic, true, true);
                Assert.assertEquals((int)((List)this.getPulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(TopicName.get((String)topic).getSchemaName()).get()).size(), (int)0);
                for (Long ledger : ledgers) {
                    try {
                        this.getPulsar().getBookKeeperClient().openLedger(ledger.longValue(), BookKeeper.DigestType.CRC32, new byte[0]);
                        Assert.fail();
                    }
                    catch (BKException.BKNoSuchLedgerExistsException bKNoSuchLedgerExistsException) {}
                }
            }
            finally {
                if (Collections.singletonList(p2).get(0) != null) {
                    p2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(p1).get(0) != null) {
                p1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProducerMultipleSchemaMessages() throws Exception {
        String tenant = "public";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicName = "auto_schema_test";
        String ns = "public/" + namespace;
        this.admin.namespaces().createNamespace(ns, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        String topic = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)"auto_schema_test").toString();
        Producer producer = this.pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
        try {
            producer.newMessage(Schema.STRING).value((Object)CLUSTER_NAME).send();
            producer.newMessage(Schema.JSON(Schemas.PersonThree.class)).value((Object)new Schemas.PersonThree(0, "ran")).send();
            producer.newMessage(Schema.AVRO(Schemas.PersonThree.class)).value((Object)new Schemas.PersonThree(0, "ran")).send();
            producer.newMessage(Schema.AVRO(Schemas.PersonOne.class)).value((Object)new Schemas.PersonOne(0)).send();
            producer.newMessage(Schema.JSON(Schemas.PersonThree.class)).value((Object)new Schemas.PersonThree(1, "tang")).send();
            producer.newMessage(Schema.BYTES).value((Object)CLUSTER_NAME.getBytes(StandardCharsets.UTF_8)).send();
            producer.newMessage(Schema.BYTES).value((Object)CLUSTER_NAME.getBytes(StandardCharsets.UTF_8)).send();
            producer.newMessage(Schema.BOOL).value((Object)true).send();
            List allSchemas = this.admin.schemas().getAllSchemas(topic);
            Assert.assertEquals((int)allSchemas.size(), (int)5);
            Assert.assertEquals(allSchemas.get(0), (Object)Schema.STRING.getSchemaInfo());
            Assert.assertEquals(allSchemas.get(1), (Object)Schema.JSON(Schemas.PersonThree.class).getSchemaInfo());
            Assert.assertEquals(allSchemas.get(2), (Object)Schema.AVRO(Schemas.PersonThree.class).getSchemaInfo());
            Assert.assertEquals(allSchemas.get(3), (Object)Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
            Assert.assertEquals(allSchemas.get(4), (Object)Schema.BOOL.getSchemaInfo());
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test
    public void testNullKey() throws Exception {
        String tenant = "public";
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String topicName = "test-schema-" + SchemaCompatibilityCheckTest.randomName(16);
        String topic = TopicName.get((String)TopicDomain.persistent.value(), (String)"public", (String)namespace, (String)topicName).toString();
        this.admin.namespaces().createNamespace("public/" + namespace, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.topics().createPartitionedTopic(topic, 2);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("test-sub").topic(new String[]{topic}).subscribe();
        producer.send((Object)"foo");
        Message message = consumer.receive();
        Assert.assertNull((Object)message.getKey());
        Assert.assertEquals((String)"foo", (String)((String)message.getValue()));
    }

    public void testConsumeMultipleSchemaMessages() throws Exception {
        String namespace = "test-namespace-" + SchemaCompatibilityCheckTest.randomName(16);
        String ns = "public/" + namespace;
        this.admin.namespaces().createNamespace(ns, (Set)Sets.newHashSet((Object[])new String[]{CLUSTER_NAME}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        String autoProducerTopic = this.getTopicName(ns, "auto_produce_topic");
        Producer autoProducer = this.pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(autoProducerTopic).create();
        AtomicInteger totalMsgCnt = new AtomicInteger(0);
        this.generateDataByDifferentSchema(ns, "bytes_schema", Schema.BYTES, "bytes value".getBytes(), autoProducer, totalMsgCnt);
        this.generateDataByDifferentSchema(ns, "string_schema", Schema.STRING, "string value", autoProducer, totalMsgCnt);
        this.generateDataByDifferentSchema(ns, "bool_schema", Schema.BOOL, true, autoProducer, totalMsgCnt);
        this.generateDataByDifferentSchema(ns, "json_one_schema", Schema.JSON(Schemas.PersonOne.class), new Schemas.PersonOne(1), autoProducer, totalMsgCnt);
        this.generateDataByDifferentSchema(ns, "json_three_schema", Schema.JSON(Schemas.PersonThree.class), new Schemas.PersonThree(3, "ran"), autoProducer, totalMsgCnt);
        this.generateDataByDifferentSchema(ns, "json_four_schema", Schema.JSON(Schemas.PersonFour.class), new Schemas.PersonFour(4, "tang", 18), autoProducer, totalMsgCnt);
        this.generateDataByDifferentSchema(ns, "avro_one_schema", Schema.AVRO(Schemas.PersonOne.class), new Schemas.PersonOne(10), autoProducer, totalMsgCnt);
        this.generateDataByDifferentSchema(ns, "k_one_v_three_schema_separate", Schema.KeyValue((Schema)Schema.JSON(Schemas.PersonOne.class), (Schema)Schema.JSON(Schemas.PersonThree.class), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED), new KeyValue((Object)new Schemas.PersonOne(1), (Object)new Schemas.PersonThree(3, "kv-separate")), autoProducer, totalMsgCnt);
        this.generateDataByDifferentSchema(ns, "k_one_v_four_schema_inline", Schema.KeyValue((Schema)Schema.JSON(Schemas.PersonOne.class), (Schema)Schema.JSON(Schemas.PersonFour.class), (KeyValueEncodingType)KeyValueEncodingType.INLINE), new KeyValue((Object)new Schemas.PersonOne(10), (Object)new Schemas.PersonFour(30, "kv-inline", 20)), autoProducer, totalMsgCnt);
        this.generateDataByDifferentSchema(ns, "k_int_v_three_schema_separate", Schema.KeyValue((Schema)Schema.INT32, (Schema)Schema.JSON(Schemas.PersonThree.class), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED), new KeyValue((Object)100, (Object)new Schemas.PersonThree(40, "kv-separate")), autoProducer, totalMsgCnt);
        Consumer autoConsumer = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{autoProducerTopic}).subscriptionName(CLUSTER_NAME).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        for (int i = 0; i < totalMsgCnt.get(); ++i) {
            Message message = autoConsumer.receive(5, TimeUnit.SECONDS);
            if (message == null) {
                Assert.fail((String)"Failed to receive multiple schema message.");
            }
            log.info("auto consumer get native object class: {}, value: {}", ((org.apache.pulsar.client.api.schema.GenericRecord)message.getValue()).getNativeObject().getClass(), ((org.apache.pulsar.client.api.schema.GenericRecord)message.getValue()).getNativeObject());
            this.checkSchemaForAutoSchema((Message<org.apache.pulsar.client.api.schema.GenericRecord>)message);
        }
    }

    private String getTopicName(String ns, String baseTopic) {
        return ns + "/" + baseTopic;
    }

    private void generateDataByDifferentSchema(String ns, String baseTopic, Schema schema, Object data, Producer<?> autoProducer, AtomicInteger totalMsgCnt) throws PulsarClientException {
        String topic = this.getTopicName(ns, baseTopic);
        Producer producer = this.pulsarClient.newProducer(schema).topic(topic).create();
        producer.newMessage().value(data).property("baseTopic", baseTopic).send();
        totalMsgCnt.incrementAndGet();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{topic}).subscriptionName(CLUSTER_NAME).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message message = consumer.receive(5, TimeUnit.SECONDS);
        if (message == null) {
            Assert.fail((String)("Failed to receive message for topic " + topic));
        }
        if (!message.getReaderSchema().isPresent()) {
            Assert.fail((String)("Failed to get reader schema for topic " + topic));
        }
        message.getValue();
        Schema readerSchema = (Schema)message.getReaderSchema().get();
        if (readerSchema instanceof KeyValueSchema && ((KeyValueSchema)readerSchema).getKeyValueEncodingType().equals((Object)KeyValueEncodingType.SEPARATED)) {
            autoProducer.newMessage(Schema.AUTO_PRODUCE_BYTES((Schema)((Schema)message.getReaderSchema().get()))).keyBytes(message.getKeyBytes()).value((Object)message.getData()).properties(message.getProperties()).send();
        } else {
            autoProducer.newMessage(Schema.AUTO_PRODUCE_BYTES((Schema)((Schema)message.getReaderSchema().get()))).properties(message.getProperties()).value((Object)message.getData()).send();
        }
        producer.close();
        consumer.close();
    }

    private void checkSchemaForAutoSchema(Message<org.apache.pulsar.client.api.schema.GenericRecord> message) {
        String baseTopic;
        if (!message.getReaderSchema().isPresent()) {
            Assert.fail((String)"Failed to get reader schema for auto consume multiple schema topic.");
        }
        Object nativeObject = ((org.apache.pulsar.client.api.schema.GenericRecord)message.getValue()).getNativeObject();
        switch (baseTopic = message.getProperty("baseTopic")) {
            case "bytes_schema": {
                Assert.assertEquals((String)new String((byte[])nativeObject), (String)"bytes value");
                break;
            }
            case "string_schema": {
                Assert.assertEquals((String)((String)nativeObject), (String)"string value");
                break;
            }
            case "bool_schema": {
                Assert.assertEquals((Object)nativeObject, (Object)Boolean.TRUE);
                break;
            }
            case "json_one_schema": {
                JsonNode jsonNode = (JsonNode)nativeObject;
                Assert.assertEquals((int)jsonNode.get("id").intValue(), (int)1);
                break;
            }
            case "json_three_schema": {
                JsonNode jsonNode = (JsonNode)nativeObject;
                Assert.assertEquals((int)jsonNode.get("id").intValue(), (int)3);
                Assert.assertEquals((String)jsonNode.get("name").textValue(), (String)"ran");
                break;
            }
            case "json_four_schema": {
                JsonNode jsonNode = (JsonNode)nativeObject;
                Assert.assertEquals((int)jsonNode.get("id").intValue(), (int)4);
                Assert.assertEquals((String)jsonNode.get("name").textValue(), (String)"tang");
                Assert.assertEquals((int)jsonNode.get("age").intValue(), (int)18);
                break;
            }
            case "avro_one_schema": {
                GenericRecord genericRecord = (GenericRecord)nativeObject;
                Assert.assertEquals((Object)genericRecord.get("id"), (Object)10);
                break;
            }
            case "k_one_v_three_schema_separate": {
                KeyValue kv = (KeyValue)nativeObject;
                JsonNode jsonNode = ((GenericJsonRecord)kv.getKey()).getJsonNode();
                Assert.assertEquals((int)jsonNode.get("id").intValue(), (int)1);
                jsonNode = ((GenericJsonRecord)kv.getValue()).getJsonNode();
                Assert.assertEquals((int)jsonNode.get("id").intValue(), (int)3);
                Assert.assertEquals((String)jsonNode.get("name").textValue(), (String)"kv-separate");
                break;
            }
            case "k_one_v_four_schema_inline": {
                KeyValue kv = (KeyValue)nativeObject;
                JsonNode jsonNode = ((GenericJsonRecord)kv.getKey()).getJsonNode();
                Assert.assertEquals((int)jsonNode.get("id").intValue(), (int)10);
                jsonNode = ((GenericJsonRecord)kv.getValue()).getJsonNode();
                Assert.assertEquals((int)jsonNode.get("id").intValue(), (int)30);
                Assert.assertEquals((String)jsonNode.get("name").textValue(), (String)"kv-inline");
                Assert.assertEquals((int)jsonNode.get("age").intValue(), (int)20);
                break;
            }
            case "k_int_v_three_schema_separate": {
                KeyValue kv = (KeyValue)nativeObject;
                Assert.assertEquals((Object)kv.getKey(), (Object)100);
                JsonNode jsonNode = ((GenericJsonRecord)kv.getValue()).getJsonNode();
                Assert.assertEquals((int)jsonNode.get("id").intValue(), (int)40);
                Assert.assertEquals((String)jsonNode.get("name").textValue(), (String)"kv-separate");
                break;
            }
        }
    }
}

