package com.hazelcast.client.topic;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ClientReliableTopicConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.TestThread;
import com.hazelcast.test.annotation.NightlyTest;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({NightlyTest.class})
/* loaded from: input_file:com/hazelcast/client/topic/ClientReliableTopicStressTest.class */
public class ClientReliableTopicStressTest extends HazelcastTestSupport {
    private ILogger logger;
    private final AtomicBoolean stop = new AtomicBoolean();
    private ITopic<Long> topic;

    /* loaded from: input_file:com/hazelcast/client/topic/ClientReliableTopicStressTest$ProduceThread.class */
    public class ProduceThread extends TestThread {
        private volatile long send = 0;

        public ProduceThread() {
        }

        public void onError(Throwable th) {
            ClientReliableTopicStressTest.this.stop.set(true);
        }

        public void doRun() throws Throwable {
            while (!ClientReliableTopicStressTest.this.stop.get()) {
                ClientReliableTopicStressTest.this.topic.publish(Long.valueOf(this.send));
                this.send++;
                if (this.send % 10000 == 0) {
                    ClientReliableTopicStressTest.this.logger.info("Publishing: " + this.send);
                }
            }
        }
    }

    /* loaded from: input_file:com/hazelcast/client/topic/ClientReliableTopicStressTest$StressMessageListener.class */
    public class StressMessageListener implements MessageListener<Long> {
        private final int id;
        private long received = 0;
        private long failures = 0;

        public StressMessageListener(int i) {
            this.id = i;
        }

        public void onMessage(Message<Long> message) {
            if (!((Long) message.getMessageObject()).equals(Long.valueOf(this.received))) {
                this.failures++;
            }
            if (this.received % 10000 == 0) {
                ClientReliableTopicStressTest.this.logger.info(toString() + " is at: " + this.received);
            }
            this.received++;
        }

        public String toString() {
            return "StressMessageListener-" + this.id;
        }
    }

    @Before
    public void setup() {
        this.logger = Logger.getLogger(getClass());
        Config config = new Config();
        RingbufferConfig ringbufferConfig = new RingbufferConfig("foobar");
        ringbufferConfig.setCapacity(10000000);
        ringbufferConfig.setTimeToLiveSeconds(5);
        config.addRingBufferConfig(ringbufferConfig);
        ClientConfig clientConfig = new ClientConfig();
        ClientReliableTopicConfig clientReliableTopicConfig = new ClientReliableTopicConfig("foobar");
        clientConfig.addReliableTopicConfig(clientReliableTopicConfig);
        Hazelcast.newHazelcastInstance(config);
        this.topic = HazelcastClient.newHazelcastClient(clientConfig).getReliableTopic(clientReliableTopicConfig.getName());
    }

    @After
    public void teardown() {
        HazelcastClient.shutdownAll();
        Hazelcast.shutdownAll();
    }

    @Test
    public void test() throws InterruptedException {
        final StressMessageListener stressMessageListener = new StressMessageListener(1);
        this.topic.addMessageListener(stressMessageListener);
        final StressMessageListener stressMessageListener2 = new StressMessageListener(2);
        this.topic.addMessageListener(stressMessageListener2);
        sleepSeconds(5);
        final ProduceThread produceThread = new ProduceThread();
        produceThread.start();
        this.logger.info("Starting test");
        sleepAndStop(this.stop, TimeUnit.SECONDS.toSeconds(30L));
        this.logger.info("Completed");
        produceThread.assertSucceedsEventually();
        this.logger.info("Number of items produced: " + produceThread.send);
        assertTrueEventually(new AssertTask() { // from class: com.hazelcast.client.topic.ClientReliableTopicStressTest.1
            public void run() throws Exception {
                Assert.assertEquals(produceThread.send, stressMessageListener.received);
                Assert.assertEquals(produceThread.send, stressMessageListener2.received);
                Assert.assertEquals(0L, stressMessageListener.failures);
                Assert.assertEquals(0L, stressMessageListener2.failures);
            }
        });
        this.logger.info("Done");
    }
}
