package org.onosproject.store.cluster.messaging.impl;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.junit.TestTools;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.HybridLogicalClockService;
import org.onosproject.core.HybridLogicalTime;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.cluster.messaging.Endpoint;

/* loaded from: input_file:org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.class */
public class NettyMessagingManagerTest {
    NettyMessagingManager netty1;
    NettyMessagingManager netty2;
    private static final String DUMMY_NAME = "node";
    private static final String IP_STRING = "127.0.0.1";
    HybridLogicalClockService testClockService = new HybridLogicalClockService() { // from class: org.onosproject.store.cluster.messaging.impl.NettyMessagingManagerTest.1
        AtomicLong counter = new AtomicLong();

        public HybridLogicalTime timeNow() {
            return new HybridLogicalTime(this.counter.incrementAndGet(), 0L);
        }

        public void recordEventTime(HybridLogicalTime hybridLogicalTime) {
        }
    };
    Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
    Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
    Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);

    @Before
    public void setUp() throws Exception {
        this.ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), TestTools.findAvailablePort(5001));
        this.netty1 = new NettyMessagingManager();
        this.netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, this.ep1);
        this.netty1.clockService = this.testClockService;
        this.netty1.activate();
        this.ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), TestTools.findAvailablePort(5003));
        this.netty2 = new NettyMessagingManager();
        this.netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, this.ep2);
        this.netty2.clockService = this.testClockService;
        this.netty2.activate();
    }

    private String nextSubject() {
        return UUID.randomUUID().toString();
    }

    @After
    public void tearDown() throws Exception {
        if (this.netty1 != null) {
            this.netty1.deactivate();
        }
        if (this.netty2 != null) {
            this.netty2.deactivate();
        }
    }

    @Test
    public void testSendAsync() {
        String nextSubject = nextSubject();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.netty1.sendAsync(this.ep2, nextSubject, "hello world".getBytes()).whenComplete((r3, th) -> {
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        Uninterruptibles.awaitUninterruptibly(countDownLatch);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.netty1.sendAsync(this.invalidEndPoint, nextSubject, "hello world".getBytes()).whenComplete((r32, th2) -> {
            Assert.assertNotNull(th2);
            Assert.assertTrue(th2 instanceof ConnectException);
            countDownLatch2.countDown();
        });
        Uninterruptibles.awaitUninterruptibly(countDownLatch2);
    }

    @Test
    @Ignore
    public void testSendAndReceive() {
        String nextSubject = nextSubject();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        this.netty2.registerHandler(nextSubject, (endpoint, bArr) -> {
            atomicBoolean.set(true);
            atomicReference2.set(endpoint);
            atomicReference.set(bArr);
            return "hello there".getBytes();
        }, MoreExecutors.directExecutor());
        Assert.assertTrue(Arrays.equals("hello there".getBytes(), (byte[]) this.netty1.sendAndReceive(this.ep2, nextSubject, "hello world".getBytes()).join()));
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(Arrays.equals((byte[]) atomicReference.get(), "hello world".getBytes()));
        Assert.assertEquals(this.ep1, atomicReference2.get());
    }

    @Test
    @Ignore
    public void testSendAndReceiveWithExecutor() {
        String nextSubject = nextSubject();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, "completion-thread");
        });
        ExecutorService newSingleThreadExecutor2 = Executors.newSingleThreadExecutor(runnable2 -> {
            return new Thread(runnable2, "handler-thread");
        });
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.netty2.registerHandler(nextSubject, (endpoint, bArr) -> {
            atomicReference.set(Thread.currentThread().getName());
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                Assert.fail("InterruptedException");
            }
            return "hello there".getBytes();
        }, newSingleThreadExecutor2);
        CompletableFuture sendAndReceive = this.netty1.sendAndReceive(this.ep2, nextSubject, "hello world".getBytes(), newSingleThreadExecutor);
        sendAndReceive.whenComplete((bArr2, th) -> {
            atomicReference2.set(Thread.currentThread().getName());
        });
        countDownLatch.countDown();
        Assert.assertTrue(Arrays.equals("hello there".getBytes(), (byte[]) sendAndReceive.join()));
        Assert.assertEquals("completion-thread", atomicReference2.get());
        Assert.assertEquals("handler-thread", atomicReference.get());
    }

    private ClusterMetadataService dummyMetadataService(final String str, final String str2, final Endpoint endpoint) {
        return new ClusterMetadataService() { // from class: org.onosproject.store.cluster.messaging.impl.NettyMessagingManagerTest.2
            public ClusterMetadata getClusterMetadata() {
                return new ClusterMetadata(new ProviderId(NettyMessagingManagerTest.DUMMY_NAME, NettyMessagingManagerTest.DUMMY_NAME), str, Sets.newHashSet(), Sets.newHashSet());
            }

            public ControllerNode getLocalNode() {
                return new ControllerNode() { // from class: org.onosproject.store.cluster.messaging.impl.NettyMessagingManagerTest.2.1
                    public NodeId id() {
                        return null;
                    }

                    public IpAddress ip() {
                        return IpAddress.valueOf(str2);
                    }

                    public int tcpPort() {
                        return endpoint.port();
                    }
                };
            }

            public void addListener(ClusterMetadataEventListener clusterMetadataEventListener) {
            }

            public void removeListener(ClusterMetadataEventListener clusterMetadataEventListener) {
            }
        };
    }
}
