package com.hazelcast.client.topic;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.proxy.ClientReliableTopicProxy;
import com.hazelcast.client.properties.ClientProperty;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.test.ClientCommonTestWithRemoteController;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.topic.ITopic;
import com.hazelcast.topic.Message;
import com.hazelcast.topic.impl.reliable.DurableMessageListener;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/client/topic/ClientReliableTopicOnClusterRestartTest.class */
public class ClientReliableTopicOnClusterRestartTest extends ClientCommonTestWithRemoteController {
    @Override // com.hazelcast.test.ClientCommonTestWithRemoteController
    @Before
    public void startClusterWithMembers() {
    }

    @Test
    public void serverRestartWhenReliableTopicListenerRegistered() throws TException {
        startClusterWithSmallInstanceConfig();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig().setClusterConnectTimeoutMillis(Long.MAX_VALUE);
        HazelcastInstance createClient = createClient(clientConfig);
        HazelcastInstance createClient2 = createClient(clientConfig);
        ITopic reliableTopic = createClient.getReliableTopic("topic");
        ITopic reliableTopic2 = createClient2.getReliableTopic("topic");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        reliableTopic.addMessageListener(message -> {
            countDownLatch.countDown();
        });
        terminateMember(getMemberByPort(5701).getUuid());
        startMember();
        warmUpPartitions(createClient, createClient2);
        reliableTopic2.publish(5);
        assertOpenEventually(countDownLatch);
    }

    @Test
    public void shouldContinue_OnClusterRestart_afterInvocationTimeout() throws InterruptedException {
        startClusterWithSmallInstanceConfig();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig().setClusterConnectTimeoutMillis(Long.MAX_VALUE);
        clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(), String.valueOf(2));
        HazelcastClientInstanceImpl createClient = createClient(clientConfig);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ClientReliableTopicProxy reliableTopic = createClient.getReliableTopic("topic");
        UUID addMessageListener = reliableTopic.addMessageListener(createListener(true, message -> {
            countDownLatch.countDown();
        }));
        stopMember();
        Thread.sleep(TimeUnit.SECONDS.toMillis(2));
        startMember();
        createClient().getReliableTopic("topic").publish("message");
        assertOpenEventually(countDownLatch);
        Assert.assertFalse(reliableTopic.isListenerCancelled(addMessageListener));
    }

    @Test
    public void shouldContinue_OnClusterRestart_whenDataLoss_LossTolerant_afterInvocationTimeout() throws InterruptedException {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig().setClusterConnectTimeoutMillis(Long.MAX_VALUE);
        clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(), String.valueOf(2));
        startClusterWithSmallInstanceConfig();
        HazelcastClientInstanceImpl createClient = createClient(clientConfig);
        AtomicLong atomicLong = new AtomicLong();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        String str = "topic";
        HazelcastClientInstanceImpl createClient2 = createClient();
        createClient2.getReliableTopic("topic").publish("message");
        createClient2.getReliableTopic("topic").publish("message");
        ClientReliableTopicProxy reliableTopic = createClient.getReliableTopic("topic");
        UUID addMessageListener = reliableTopic.addMessageListener(createListener(true, message -> {
            atomicLong.incrementAndGet();
            countDownLatch.countDown();
        }));
        stopMember();
        startMember();
        Thread.sleep(TimeUnit.SECONDS.toMillis(2));
        assertTrueEventually(() -> {
            createClient2.getReliableTopic(str).publish("newItem " + UUID.randomUUID());
            assertOpenEventually(countDownLatch, 5L);
        });
        Assert.assertFalse(reliableTopic.isListenerCancelled(addMessageListener));
        Assert.assertTrue(atomicLong.get() >= 1);
    }

    @Test
    public void shouldFail_OnClusterRestart_whenDataLoss_notLossTolerant() {
        String str = "capacity10";
        startClusterWithMembers(1, "hazelcast-ringbuffer-basic-test.xml", null);
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig().setClusterConnectTimeoutMillis(Long.MAX_VALUE);
        HazelcastClientInstanceImpl createClient = createClient(clientConfig);
        AtomicLong atomicLong = new AtomicLong();
        HazelcastClientInstanceImpl createClient2 = createClient();
        for (int i = 0; i < 10000; i++) {
            createClient2.getReliableTopic("capacity10").publish("message");
        }
        ITopic reliableTopic = createClient.getReliableTopic("capacity10");
        UUID addMessageListener = reliableTopic.addMessageListener(createListener(false, message -> {
            atomicLong.incrementAndGet();
        }));
        stopMember();
        startMember();
        assertTrueEventually(() -> {
            createClient2.getReliableTopic(str).publish("message");
            Assert.assertTrue(((ClientReliableTopicProxy) reliableTopic).isListenerCancelled(addMessageListener));
        });
        Assert.assertEquals(0L, atomicLong.get());
    }

    @Override // com.hazelcast.test.ClientCommonTestWithRemoteController
    protected Map<String, String> getSystemProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put("hazelcast.partition.count", "11");
        hashMap.put("hazelcast.operation.thread.count", "2");
        hashMap.put("hazelcast.operation.generic.thread.count", "2");
        hashMap.put("hazelcast.event.thread.count", "1");
        return hashMap;
    }

    private <T> DurableMessageListener<T> createListener(final boolean z, final Consumer<Message<T>> consumer) {
        return new DurableMessageListener<T>() { // from class: com.hazelcast.client.topic.ClientReliableTopicOnClusterRestartTest.1
            @Override // com.hazelcast.topic.impl.reliable.DurableMessageListener
            public void onMessage(Message<T> message) {
                consumer.accept(message);
            }

            @Override // com.hazelcast.topic.impl.reliable.DurableMessageListener
            public boolean isLossTolerant() {
                return z;
            }
        };
    }

    private void startClusterWithSmallInstanceConfig() {
        startClusterWithMembers(1, "hazelcast-tiny.xml", null);
    }
}
