/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerInterceptor;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class InterceptorsTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(InterceptorsTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="receiverQueueSize")
    public Object[][] getReceiverQueueSize() {
        return new Object[][]{{0}, {1000}};
    }

    @Test
    public void testProducerInterceptor() throws Exception {
        final HashMap ackCallback = new HashMap();
        String ns = "my-property/my-ns" + RandomUtils.nextInt((int)999, (int)1999);
        this.admin.namespaces().createNamespace(ns, (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        abstract class BaseInterceptor
        implements ProducerInterceptor {
            private static final String set = "set";
            private String tag;
            final /* synthetic */ Map val$ackCallback;

            BaseInterceptor(String tag) {
                this.val$ackCallback = map;
                this.tag = tag;
            }

            public void close() {
            }

            public Message beforeSend(Producer producer, Message message) {
                MessageImpl msg = (MessageImpl)message;
                msg.getMessageBuilder().addProperty().setKey(this.tag).setValue(set);
                return message;
            }

            public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
                if (!set.equals(message.getProperties().get(this.tag))) {
                    return;
                }
                this.val$ackCallback.computeIfAbsent(msgId, k -> new ArrayList()).add(this.tag);
            }
        }
        BaseInterceptor interceptor1 = new BaseInterceptor("int1"){
            {
                super(InterceptorsTest.this, tag, map);
            }

            public boolean eligible(Message message) {
                return true;
            }
        };
        BaseInterceptor interceptor2 = new BaseInterceptor("int2"){
            {
                super(InterceptorsTest.this, tag, map);
            }

            public boolean eligible(Message message) {
                return SchemaType.STRING.equals((Object)((MessageImpl)message).getSchemaInternal().getSchemaInfo().getType());
            }
        };
        BaseInterceptor interceptor3 = new BaseInterceptor("int3"){
            {
                super(InterceptorsTest.this, tag, map);
            }

            public boolean eligible(Message message) {
                return SchemaType.INT32.equals((Object)((MessageImpl)message).getSchemaInternal().getSchemaInfo().getType());
            }
        };
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://" + ns + "/my-topic").intercept(new ProducerInterceptor[]{interceptor1, interceptor2, interceptor3}).create();
        MessageId messageId = producer.newMessage().property("STR", "Y").value((Object)"Hello Pulsar!").send();
        Assert.assertEquals((Collection)((Collection)ackCallback.get(messageId)), Arrays.asList(interceptor1.tag, interceptor2.tag));
        log.info("Send result messageId: {}", (Object)messageId);
        MessageId messageId2 = producer.newMessage(Schema.INT32).property("INT", "Y").value((Object)18).send();
        Assert.assertEquals((Collection)((Collection)ackCallback.get(messageId2)), Arrays.asList(interceptor1.tag, interceptor3.tag));
        log.info("Send result messageId: {}", (Object)messageId2);
        producer.close();
    }

    @Test
    public void testProducerInterceptorsWithExceptions() throws PulsarClientException {
        org.apache.pulsar.client.api.ProducerInterceptor<String> interceptor = new org.apache.pulsar.client.api.ProducerInterceptor<String>(){

            public void close() {
            }

            public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
                throw new NullPointerException();
            }

            public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, Throwable exception) {
                throw new NullPointerException();
            }
        };
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").intercept(new org.apache.pulsar.client.api.ProducerInterceptor[]{interceptor}).create();
        MessageId messageId = producer.newMessage().value((Object)"Hello Pulsar!").send();
        Assert.assertNotNull((Object)messageId);
        producer.close();
    }

    @Test
    public void testProducerInterceptorsWithErrors() throws PulsarClientException {
        org.apache.pulsar.client.api.ProducerInterceptor<String> interceptor = new org.apache.pulsar.client.api.ProducerInterceptor<String>(){

            public void close() {
            }

            public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
                throw new AbstractMethodError();
            }

            public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, Throwable exception) {
                throw new AbstractMethodError();
            }
        };
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").intercept(new org.apache.pulsar.client.api.ProducerInterceptor[]{interceptor}).create();
        MessageId messageId = producer.newMessage().value((Object)"Hello Pulsar!").send();
        Assert.assertNotNull((Object)messageId);
        producer.close();
    }

    @Test
    public void testConsumerInterceptorWithErrors() throws PulsarClientException {
        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>(){

            public void close() {
                throw new AbstractMethodError();
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                throw new AbstractMethodError();
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable exception) {
                throw new AbstractMethodError();
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable exception) {
                throw new AbstractMethodError();
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
                throw new AbstractMethodError();
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
                throw new AbstractMethodError();
            }
        };
        Consumer consumer1 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic-exception"}).subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{interceptor}).subscriptionName("my-subscription-ack-timeout").ackTimeout(3L, TimeUnit.SECONDS).subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic-exception"}).subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{interceptor}).subscriptionName("my-subscription-negative").subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic-exception").create();
        producer.newMessage().value((Object)"Hello Pulsar!").send();
        Message received = consumer1.receive();
        Assert.assertEquals((String)((String)received.getValue()), (String)"Hello Pulsar!");
        Message receivedAgain = consumer1.receive();
        Assert.assertEquals((String)((String)receivedAgain.getValue()), (String)"Hello Pulsar!");
        consumer1.acknowledge(receivedAgain);
        received = consumer2.receive();
        Assert.assertEquals((String)((String)received.getValue()), (String)"Hello Pulsar!");
        consumer2.negativeAcknowledge(received);
        receivedAgain = consumer2.receive();
        Assert.assertEquals((String)((String)receivedAgain.getValue()), (String)"Hello Pulsar!");
        consumer2.acknowledge(receivedAgain);
        producer.close();
        consumer1.close();
        consumer2.close();
    }

    @Test(dataProvider="receiverQueueSize")
    public void testConsumerInterceptorWithSingleTopicSubscribe(Integer receiverQueueSize) throws Exception {
        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>(){

            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                MessageImpl msg = (MessageImpl)message;
                msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
                return msg;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
                log.info("onAcknowledge messageId: {}", (Object)messageId, (Object)cause);
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
                log.info("onAcknowledgeCumulative messageIds: {}", (Object)messageId, (Object)cause);
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
            }
        };
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{interceptor}).subscriptionName("my-subscription").receiverQueueSize(receiverQueueSize.intValue()).subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").enableBatching(false).create();
        producer.newMessage().value((Object)"Hello Pulsar!").send();
        Message received = consumer.receive();
        MessageImpl msg = (MessageImpl)received;
        boolean haveKey = false;
        for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
            if (!"beforeConsumer".equals(keyValue.getKey())) continue;
            haveKey = true;
        }
        Assert.assertTrue((boolean)haveKey);
        consumer.acknowledge(received);
        producer.newMessage().value((Object)"Hello Pulsar!").send();
        received = (Message)consumer.receiveAsync().get();
        msg = (MessageImpl)received;
        haveKey = false;
        for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
            if (!"beforeConsumer".equals(keyValue.getKey())) continue;
            haveKey = true;
        }
        Assert.assertTrue((boolean)haveKey);
        consumer.acknowledge(received);
        consumer.close();
        CompletableFuture future = new CompletableFuture();
        consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{interceptor}).subscriptionName("my-subscription").receiverQueueSize(receiverQueueSize.intValue()).messageListener((MessageListener & Serializable)(c, m) -> {
            try {
                c.acknowledge(m);
            }
            catch (Exception e) {
                Assert.fail((String)"Failed to acknowledge", (Throwable)e);
            }
            future.complete(m);
        }).subscribe();
        producer.newMessage().value((Object)"Hello Pulsar!").send();
        received = (Message)future.get();
        msg = (MessageImpl)received;
        haveKey = false;
        for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
            if (!"beforeConsumer".equals(keyValue.getKey())) continue;
            haveKey = true;
        }
        Assert.assertTrue((boolean)haveKey);
        producer.close();
        consumer.close();
    }

    @Test
    public void testConsumerInterceptorWithMultiTopicSubscribe() throws PulsarClientException {
        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>(){

            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                MessageImpl msg = (MessageImpl)message;
                msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
                return msg;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
                log.info("onAcknowledge messageId: {}", (Object)messageId, (Object)cause);
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
                log.info("onAcknowledgeCumulative messageIds: {}", (Object)messageId, (Object)cause);
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
            }
        };
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        Producer producer1 = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic1").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1"}).subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{interceptor}).subscriptionName("my-subscription").subscribe();
        producer.newMessage().value((Object)"Hello Pulsar!").send();
        producer1.newMessage().value((Object)"Hello Pulsar!").send();
        int keyCount = 0;
        for (int i = 0; i < 2; ++i) {
            Message received = consumer.receive();
            MessageImpl msg = (MessageImpl)((TopicMessageImpl)received).getMessage();
            for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
                if (!"beforeConsumer".equals(keyValue.getKey())) continue;
                ++keyCount;
            }
            consumer.acknowledge(received);
        }
        Assert.assertEquals((int)2, (int)keyCount);
        producer.close();
        producer1.close();
        consumer.close();
    }

    @Test
    public void testConsumerInterceptorWithPatternTopicSubscribe() throws PulsarClientException {
        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>(){

            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                MessageImpl msg = (MessageImpl)message;
                msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
                return msg;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
                log.info("onAcknowledge messageId: {}", (Object)messageId, (Object)cause);
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
                log.info("onAcknowledgeCumulative messageIds: {}", (Object)messageId, (Object)cause);
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
            }
        };
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        Producer producer1 = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic1").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topicsPattern("persistent://my-property/my-ns/my-.*").subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{interceptor}).subscriptionName("my-subscription").subscribe();
        producer.newMessage().value((Object)"Hello Pulsar!").send();
        producer1.newMessage().value((Object)"Hello Pulsar!").send();
        int keyCount = 0;
        for (int i = 0; i < 2; ++i) {
            Message received = consumer.receive();
            MessageImpl msg = (MessageImpl)((TopicMessageImpl)received).getMessage();
            for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
                if (!"beforeConsumer".equals(keyValue.getKey())) continue;
                ++keyCount;
            }
            consumer.acknowledge(received);
        }
        Assert.assertEquals((int)2, (int)keyCount);
        producer.close();
        producer1.close();
        consumer.close();
    }

    @Test
    public void testConsumerInterceptorForAcknowledgeCumulative() throws PulsarClientException {
        final ArrayList<MessageId> ackHolder = new ArrayList<MessageId>();
        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>(){

            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                MessageImpl msg = (MessageImpl)message;
                msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
                return msg;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
                log.info("onAcknowledge messageId: {}", (Object)messageId, (Object)cause);
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
                long acknowledged = ackHolder.stream().filter(m -> m.compareTo((Object)messageId) <= 0).count();
                Assert.assertEquals((long)acknowledged, (long)100L);
                ackHolder.clear();
                log.info("onAcknowledgeCumulative messageIds: {}", (Object)messageId, (Object)cause);
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
            }
        };
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionType(SubscriptionType.Failover).intercept(new ConsumerInterceptor[]{interceptor}).subscriptionName("my-subscription").subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        for (int i = 0; i < 100; ++i) {
            producer.newMessage().value((Object)"Hello Pulsar!").send();
        }
        int keyCount = 0;
        for (int i = 0; i < 100; ++i) {
            Message received = consumer.receive();
            MessageImpl msg = (MessageImpl)received;
            for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
                if (!"beforeConsumer".equals(keyValue.getKey())) continue;
                ++keyCount;
            }
            ackHolder.add(received.getMessageId());
            if (i != 99) continue;
            consumer.acknowledgeCumulative(received);
        }
        Assert.assertEquals((int)100, (int)keyCount);
        producer.close();
        consumer.close();
    }

    @Test
    public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException {
        int i;
        int totalNumOfMessages = 100;
        final CountDownLatch latch = new CountDownLatch(50);
        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>(){

            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                return message;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
                messageIds.forEach(messageId -> latch.countDown());
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
            }
        };
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionType(SubscriptionType.Failover).intercept(new ConsumerInterceptor[]{interceptor}).negativeAckRedeliveryDelay(100L, TimeUnit.MILLISECONDS).subscriptionName("my-subscription").subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        for (i = 0; i < 100; ++i) {
            producer.send((Object)"Mock message");
        }
        for (i = 0; i < 100; ++i) {
            Message message = consumer.receive();
            if (i % 2 == 0) {
                consumer.negativeAcknowledge(message);
                continue;
            }
            consumer.acknowledge(message);
        }
        latch.await();
        Assert.assertEquals((long)latch.getCount(), (long)0L);
        producer.close();
        consumer.close();
    }

    @Test
    public void testConsumerInterceptorForAckTimeoutSend() throws PulsarClientException, InterruptedException {
        int i;
        int totalNumOfMessages = 100;
        final CountDownLatch latch = new CountDownLatch(50);
        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>(){

            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                return message;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
                messageIds.forEach(messageId -> latch.countDown());
            }
        };
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionName("foo").intercept(new ConsumerInterceptor[]{interceptor}).ackTimeout(2L, TimeUnit.SECONDS).subscribe();
        for (i = 0; i < 100; ++i) {
            producer.send((Object)"Mock message");
        }
        for (i = 0; i < 100; ++i) {
            Message message = consumer.receive();
            if (i % 2 != 0) continue;
            consumer.acknowledge(message);
        }
        latch.await();
        Assert.assertEquals((long)latch.getCount(), (long)0L);
        producer.close();
        consumer.close();
    }
}

