/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.proto.Test;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class SimpleTypedProducerConsumerTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(SimpleTypedProducerConsumerTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testJsonProducerAndConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        JSONSchema jsonSchema = JSONSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(JsonEncodedPojo.class).build());
        Consumer consumer = this.pulsarClient.newConsumer((Schema)jsonSchema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = this.pulsarClient.newProducer((Schema)jsonSchema).topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)new JsonEncodedPojo(message));
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            JsonEncodedPojo receivedMessage = (JsonEncodedPojo)msg.getValue();
            log.debug("Received message: [{}]", (Object)receivedMessage);
            JsonEncodedPojo expectedMessage = new JsonEncodedPojo("my-message-" + i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        SchemaRegistry.SchemaAndMetadata storedSchema = (SchemaRegistry.SchemaAndMetadata)this.pulsar.getSchemaRegistryService().getSchema("my-property/my-ns/my-topic1").get();
        Assert.assertEquals((byte[])storedSchema.schema.getData(), (byte[])jsonSchema.getSchemaInfo().getSchema());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testJsonProducerAndConsumerWithPrestoredSchema() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        JSONSchema jsonSchema = JSONSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(JsonEncodedPojo.class).build());
        this.pulsar.getSchemaRegistryService().putSchemaIfAbsent("my-property/my-ns/my-topic1", SchemaData.builder().type(SchemaType.JSON).isDeleted(false).timestamp(Clock.systemUTC().millis()).user("me").data(jsonSchema.getSchemaInfo().getSchema()).props(Collections.emptyMap()).build(), SchemaCompatibilityStrategy.FULL).get();
        Consumer consumer = this.pulsarClient.newConsumer((Schema)jsonSchema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = this.pulsarClient.newProducer((Schema)jsonSchema).topic("persistent://my-property/use/my-ns/my-topic1").create();
        consumer.close();
        producer.close();
        SchemaRegistry.SchemaAndMetadata storedSchema = (SchemaRegistry.SchemaAndMetadata)this.pulsar.getSchemaRegistryService().getSchema("my-property/my-ns/my-topic1").get();
        Assert.assertEquals((byte[])storedSchema.schema.getData(), (byte[])jsonSchema.getSchemaInfo().getSchema());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testWrongCorruptedSchema() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        byte[] randomSchemaBytes = "hello".getBytes();
        try {
            this.pulsar.getSchemaRegistryService().putSchemaIfAbsent("my-property/my-ns/my-topic1", SchemaData.builder().type(SchemaType.JSON).isDeleted(false).timestamp(Clock.systemUTC().millis()).user("me").data(randomSchemaBytes).props(Collections.emptyMap()).build(), SchemaCompatibilityStrategy.FULL).get();
            Assert.fail((String)"Should fail to add corrupted schema data");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof InvalidSchemaDataException));
        }
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testProtobufProducerAndConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ProtobufSchema protobufSchema = ProtobufSchema.of(Test.TestMessage.class);
        Consumer consumer = this.pulsarClient.newConsumer((Schema)protobufSchema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = this.pulsarClient.newProducer((Schema)protobufSchema).topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)Test.TestMessage.newBuilder().setStringField(message).build());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            Test.TestMessage receivedMessage = (Test.TestMessage)msg.getValue();
            log.debug("Received message: [{}]", (Object)receivedMessage);
            Test.TestMessage expectedMessage = Test.TestMessage.newBuilder().setStringField("my-message-" + i).build();
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        SchemaRegistry.SchemaAndMetadata storedSchema = (SchemaRegistry.SchemaAndMetadata)this.pulsar.getSchemaRegistryService().getSchema("my-property/my-ns/my-topic1").get();
        Assert.assertEquals((byte[])storedSchema.schema.getData(), (byte[])protobufSchema.getSchemaInfo().getSchema());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(expectedExceptions={PulsarClientException.class})
    public void testProtobufConsumerWithWrongPrestoredSchema() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ProtobufSchema schema = ProtobufSchema.of(Test.TestMessage.class);
        this.pulsar.getSchemaRegistryService().putSchemaIfAbsent("my-property/my-ns/my-topic1", SchemaData.builder().type(SchemaType.PROTOBUF).isDeleted(false).timestamp(Clock.systemUTC().millis()).user("me").data(schema.getSchemaInfo().getSchema()).props(Collections.emptyMap()).build(), SchemaCompatibilityStrategy.FULL).get();
        Consumer consumer = this.pulsarClient.newConsumer((Schema)AvroSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(Test.TestMessageWrong.class).build())).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testAvroProducerAndConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        AvroSchema avroSchema = AvroSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(AvroEncodedPojo.class).build());
        Consumer consumer = this.pulsarClient.newConsumer((Schema)avroSchema).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = this.pulsarClient.newProducer((Schema)avroSchema).topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)new AvroEncodedPojo(message));
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            AvroEncodedPojo receivedMessage = (AvroEncodedPojo)msg.getValue();
            log.debug("Received message: [{}]", (Object)receivedMessage);
            AvroEncodedPojo expectedMessage = new AvroEncodedPojo("my-message-" + i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        SchemaRegistry.SchemaAndMetadata storedSchema = (SchemaRegistry.SchemaAndMetadata)this.pulsar.getSchemaRegistryService().getSchema("my-property/my-ns/my-topic1").get();
        Assert.assertEquals((byte[])storedSchema.schema.getData(), (byte[])avroSchema.getSchemaInfo().getSchema());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(expectedExceptions={PulsarClientException.class})
    public void testAvroConsumerWithWrongRestoredSchema() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        byte[] randomSchemaBytes = "{\n     \"type\": \"record\",\n     \"namespace\": \"com.example\",\n     \"name\": \"FullName\",\n     \"fields\": [\n       { \"name\": \"first\", \"type\": \"string\" },\n       { \"name\": \"last\", \"type\": \"string\" }\n     ]\n} ".getBytes();
        this.pulsar.getSchemaRegistryService().putSchemaIfAbsent("my-property/my-ns/my-topic1", SchemaData.builder().type(SchemaType.AVRO).isDeleted(false).timestamp(Clock.systemUTC().millis()).user("me").data(randomSchemaBytes).props(Collections.emptyMap()).build(), SchemaCompatibilityStrategy.FULL).get();
        Consumer consumer = this.pulsarClient.newConsumer((Schema)AvroSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(AvroEncodedPojo.class).withAlwaysAllowNull(false).build())).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testAvroProducerAndAutoSchemaConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        AvroSchema avroSchema = AvroSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(AvroEncodedPojo.class).build());
        Producer producer = this.pulsarClient.newProducer((Schema)avroSchema).topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)new AvroEncodedPojo(message));
        }
        Consumer consumer = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            GenericRecord receivedMessage = (GenericRecord)msg.getValue();
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            String actualMessage = (String)receivedMessage.getField("message");
            this.testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        SchemaRegistry.SchemaAndMetadata storedSchema = (SchemaRegistry.SchemaAndMetadata)this.pulsar.getSchemaRegistryService().getSchema("my-property/my-ns/my-topic1").get();
        Assert.assertEquals((byte[])storedSchema.schema.getData(), (byte[])avroSchema.getSchemaInfo().getSchema());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testAvroProducerAndAutoSchemaReader() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        AvroSchema avroSchema = AvroSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(AvroEncodedPojo.class).build());
        Producer producer = this.pulsarClient.newProducer((Schema)avroSchema).topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)new AvroEncodedPojo(message));
        }
        Reader reader = this.pulsarClient.newReader(Schema.AUTO_CONSUME()).topic("persistent://my-property/use/my-ns/my-topic1").startMessageId(MessageId.earliest).create();
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader.readNext();
            GenericRecord receivedMessage = (GenericRecord)msg.getValue();
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            String actualMessage = (String)receivedMessage.getField("message");
            this.testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage);
        }
        reader.close();
        SchemaRegistry.SchemaAndMetadata storedSchema = (SchemaRegistry.SchemaAndMetadata)this.pulsar.getSchemaRegistryService().getSchema("my-property/my-ns/my-topic1").get();
        Assert.assertEquals((byte[])storedSchema.schema.getData(), (byte[])avroSchema.getSchemaInfo().getSchema());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testAutoBytesProducer() throws Exception {
        String message;
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        AvroSchema avroSchema = AvroSchema.of((SchemaDefinition)SchemaDefinition.builder().withPojo(AvroEncodedPojo.class).build());
        try (Producer producer = this.pulsarClient.newProducer((Schema)avroSchema).topic("persistent://my-property/use/my-ns/my-topic1").create();){
            for (i = 0; i < 10; ++i) {
                message = "my-message-" + i;
                producer.send((Object)new AvroEncodedPojo(message));
            }
        }
        producer = this.pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic("persistent://my-property/use/my-ns/my-topic1").create();
        var3_3 = null;
        try {
            byte[] data;
            for (i = 10; i < 20; ++i) {
                message = "my-message-" + i;
                data = avroSchema.encode((Object)new AvroEncodedPojo(message));
                byte[] junkData = new byte[data.length / 2];
                System.arraycopy(data, 0, junkData, 0, junkData.length);
                try {
                    producer.send((Object)junkData);
                    Assert.fail((String)"Should fail on sending junk data");
                    continue;
                }
                catch (SchemaSerializationException schemaSerializationException) {
                    // empty catch block
                }
            }
            for (i = 10; i < 20; ++i) {
                message = "my-message-" + i;
                data = avroSchema.encode((Object)new AvroEncodedPojo(message));
                producer.send((Object)data);
            }
        }
        catch (Throwable i2) {
            var3_3 = i2;
            throw i2;
        }
        finally {
            if (producer != null) {
                if (var3_3 != null) {
                    try {
                        producer.close();
                    }
                    catch (Throwable i2) {
                        var3_3.addSuppressed(i2);
                    }
                } else {
                    producer.close();
                }
            }
        }
        Consumer consumer = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i3 = 0; i3 < 20; ++i3) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            GenericRecord receivedMessage = (GenericRecord)msg.getValue();
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i3;
            String actualMessage = (String)receivedMessage.getField("message");
            this.testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        SchemaRegistry.SchemaAndMetadata storedSchema = (SchemaRegistry.SchemaAndMetadata)this.pulsar.getSchemaRegistryService().getSchema("my-property/my-ns/my-topic1").get();
        Assert.assertEquals((byte[])storedSchema.schema.getData(), (byte[])avroSchema.getSchemaInfo().getSchema());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMessageBuilderLoadConf() throws Exception {
        String topic = BrokerTestUtil.newUniqueName("my-topic");
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("my-subscriber-name").subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).create();
            try {
                HashMap<String, String> properties = new HashMap<String, String>();
                properties.put("a", "1");
                properties.put("b", "2");
                HashMap<String, Object> msgConf = new HashMap<String, Object>();
                msgConf.put("key", "key-1");
                msgConf.put("properties", properties);
                msgConf.put("eventTime", 1234L);
                msgConf.put("sequenceId", 5L);
                msgConf.put("replicationClusters", Lists.newArrayList((Object[])new String[]{"a", "b", "c"}));
                msgConf.put("disableReplication", false);
                producer.newMessage().value((Object)"my-message").loadConf(msgConf).send();
                Message msg = consumer.receive();
                Assert.assertEquals((String)msg.getKey(), (String)"key-1");
                Assert.assertEquals((String)((String)msg.getProperties().get("a")), (String)"1");
                Assert.assertEquals((String)((String)msg.getProperties().get("b")), (String)"2");
                Assert.assertEquals((long)msg.getEventTime(), (long)1234L);
                Assert.assertEquals((long)msg.getSequenceId(), (long)5L);
                consumer.acknowledge(msg);
                msgConf.clear();
                msgConf.put("nonExistingKey", "key-1");
                try {
                    producer.newMessage().value((Object)"my-message").loadConf(msgConf).send();
                    Assert.fail((String)"Should have failed");
                }
                catch (RuntimeException runtimeException) {
                    // empty catch block
                }
                msgConf.clear();
                msgConf.put("eventTime", "hello");
                try {
                    producer.newMessage().value((Object)"my-message").loadConf(msgConf).send();
                    Assert.fail((String)"Should have failed");
                }
                catch (RuntimeException runtimeException) {
                    // empty catch block
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    public static class JsonEncodedPojo {
        private String message;

        public JsonEncodedPojo() {
        }

        public JsonEncodedPojo(String message) {
            this.message = message;
        }

        public String getMessage() {
            return this.message;
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            JsonEncodedPojo that = (JsonEncodedPojo)o;
            return Objects.equals(this.message, that.message);
        }

        public int hashCode() {
            return Objects.hash(this.message);
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("message", (Object)this.message).toString();
        }
    }

    public static class AvroEncodedPojo {
        private String message;

        public AvroEncodedPojo() {
        }

        public AvroEncodedPojo(String message) {
            this.message = message;
        }

        public String getMessage() {
            return this.message;
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AvroEncodedPojo that = (AvroEncodedPojo)o;
            return Objects.equals(this.message, that.message);
        }

        public int hashCode() {
            return Objects.hash(this.message);
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("message", (Object)this.message).toString();
        }
    }
}

