/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class NullValueTest
extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(NullValueTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="topics")
    public static Object[][] topics() {
        return new Object[][]{{"persistent://prop/ns-abc/null-value-test-0", 1}, {"persistent://prop/ns-abc/null-value-test-1", 3}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="topics")
    public void nullValueBytesSchemaTest(String topic, int partitions) throws PulsarClientException, PulsarAdminException {
        this.admin.topics().createPartitionedTopic(topic, partitions);
        Producer producer = this.pulsarClient.newProducer().topic(topic).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("test").subscribe();
            try {
                int i;
                int numMessage = 10;
                for (i = 0; i < numMessage; ++i) {
                    if (i % 2 == 0) {
                        producer.newMessage().value((Object)"not null".getBytes()).send();
                        continue;
                    }
                    producer.newMessage().value(null).send();
                }
                for (i = 0; i < numMessage; ++i) {
                    Message message2 = consumer.receive();
                    if (i % 2 == 0) {
                        Assert.assertNotNull((Object)message2.getData());
                        Assert.assertNotNull((Object)message2.getValue());
                        Assert.assertEquals((String)new String(message2.getData()), (String)"not null");
                    } else {
                        Assert.assertNull((Object)message2.getData());
                        Assert.assertNull((Object)message2.getValue());
                    }
                    consumer.acknowledge(message2);
                }
                for (i = 0; i < numMessage; ++i) {
                    if (i % 2 == 0) {
                        producer.newMessage().value((Object)"not null".getBytes()).sendAsync();
                        continue;
                    }
                    producer.newMessage().value(null).sendAsync();
                }
                i = 0;
                while (i < numMessage) {
                    CompletableFuture completableFuture = consumer.receiveAsync();
                    int index = i++;
                    completableFuture.whenComplete((message, throwable) -> {
                        Assert.assertNull((Object)throwable);
                        if (index % 2 == 0) {
                            Assert.assertNotNull((Object)message.getData());
                            Assert.assertNotNull((Object)message.getValue());
                            Assert.assertEquals((String)new String(message.getData()), (String)"not null");
                        } else {
                            Assert.assertNull((Object)message.getData());
                            Assert.assertNull((Object)message.getValue());
                        }
                        try {
                            consumer.acknowledge(message);
                        }
                        catch (PulsarClientException e) {
                            Assert.assertNull((Object)((Object)e));
                        }
                    });
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="topics")
    public void nullValueBooleanSchemaTest(String topic, int partitions) throws PulsarClientException, PulsarAdminException {
        this.admin.topics().createPartitionedTopic(topic, partitions);
        Producer producer = this.pulsarClient.newProducer(Schema.BOOL).topic(topic).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.BOOL).topic(new String[]{topic}).subscriptionName("test").subscribe();
            try {
                int i;
                int numMessage = 10;
                for (i = 0; i < numMessage; ++i) {
                    producer.newMessage().value(null).sendAsync();
                }
                for (i = 0; i < numMessage; ++i) {
                    Message message = consumer.receive();
                    Assert.assertNull((Object)message.getValue());
                    Assert.assertNull((Object)message.getData());
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="topics")
    public void keyValueNullInlineTest(String topic, int partitions) throws PulsarClientException, PulsarAdminException {
        this.admin.topics().createPartitionedTopic(topic, partitions);
        Producer producer = this.pulsarClient.newProducer(KeyValueSchemaImpl.of((Schema)Schema.STRING, (Schema)Schema.STRING)).topic(topic).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(KeyValueSchemaImpl.of((Schema)Schema.STRING, (Schema)Schema.STRING)).topic(new String[]{topic}).subscriptionName("test").subscribe();
            try {
                int numMessage = 10;
                for (int i = 0; i < numMessage; ++i) {
                    producer.newMessage().value((Object)new KeyValue(null, (Object)"test")).send();
                    producer.newMessage().value((Object)new KeyValue((Object)"test", null)).send();
                    producer.newMessage().value((Object)new KeyValue(null, null)).send();
                }
                for (int i = 0; i < numMessage; ++i) {
                    Message message = consumer.receive();
                    KeyValue keyValue = (KeyValue)message.getValue();
                    Assert.assertNull((Object)keyValue.getKey());
                    Assert.assertEquals((String)"test", (String)((String)keyValue.getValue()));
                    message = consumer.receive();
                    keyValue = (KeyValue)message.getValue();
                    Assert.assertEquals((String)"test", (String)((String)keyValue.getKey()));
                    Assert.assertNull((Object)keyValue.getValue());
                    message = consumer.receive();
                    keyValue = (KeyValue)message.getValue();
                    Assert.assertNull((Object)keyValue.getKey());
                    Assert.assertNull((Object)keyValue.getValue());
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="topics")
    public void keyValueNullSeparatedTest(String topic, int partitions) throws PulsarClientException, PulsarAdminException {
        this.admin.topics().createPartitionedTopic(topic, partitions);
        Producer producer = this.pulsarClient.newProducer(KeyValueSchemaImpl.of((Schema)Schema.STRING, (Schema)Schema.STRING, (KeyValueEncodingType)KeyValueEncodingType.SEPARATED)).topic(topic).messageRouter(new MessageRouter(){

            public int choosePartition(Message<?> msg, TopicMetadata metadata) {
                return 0;
            }
        }).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(KeyValueSchemaImpl.of((Schema)Schema.STRING, (Schema)Schema.STRING, (KeyValueEncodingType)KeyValueEncodingType.SEPARATED)).topic(new String[]{topic}).subscriptionName("test").subscribe();
            try {
                int numMessage = 10;
                for (int i = 0; i < numMessage; ++i) {
                    producer.newMessage().value((Object)new KeyValue(null, (Object)"test")).send();
                    producer.newMessage().value((Object)new KeyValue((Object)"test", null)).send();
                    producer.newMessage().value((Object)new KeyValue(null, null)).send();
                }
                for (int i = 0; i < numMessage; ++i) {
                    Message message = consumer.receive();
                    KeyValue keyValue = (KeyValue)message.getValue();
                    Assert.assertNull((Object)keyValue.getKey());
                    Assert.assertEquals((String)"test", (String)((String)keyValue.getValue()));
                    message = consumer.receive();
                    keyValue = (KeyValue)message.getValue();
                    Assert.assertEquals((String)"test", (String)((String)keyValue.getKey()));
                    Assert.assertNull((Object)keyValue.getValue());
                    message = consumer.receive();
                    keyValue = (KeyValue)message.getValue();
                    Assert.assertNull((Object)keyValue.getKey());
                    Assert.assertNull((Object)keyValue.getValue());
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

