package org.apache.flume.api;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import junit.framework.Assert;
import org.apache.avro.ipc.Server;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcTestUtils;
import org.apache.flume.event.EventBuilder;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/api/TestLoadBalancingRpcClient.class */
public class TestLoadBalancingRpcClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestLoadBalancingRpcClient.class);

    @Test(expected = FlumeException.class)
    public void testCreatingLbClientSingleHost() {
        Server server = null;
        RpcClient rpcClient = null;
        try {
            server = RpcTestUtils.startServer(new RpcTestUtils.OKAvroHandler());
            Properties properties = new Properties();
            properties.put("host1", "127.0.0.1:" + server.getPort());
            properties.put("hosts", "host1");
            properties.put("client.type", "default_loadbalance");
            RpcClientFactory.getInstance(properties);
            if (server != null) {
                server.close();
            }
            if (0 != 0) {
                rpcClient.close();
            }
        } catch (Throwable th) {
            if (server != null) {
                server.close();
            }
            if (0 != 0) {
                rpcClient.close();
            }
            throw th;
        }
    }

    @Test
    public void testTwoHostFailover() throws Exception {
        Server server = null;
        Server server2 = null;
        RpcClient rpcClient = null;
        try {
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler = new RpcTestUtils.LoadBalancedAvroHandler();
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler2 = new RpcTestUtils.LoadBalancedAvroHandler();
            server = RpcTestUtils.startServer(loadBalancedAvroHandler);
            server2 = RpcTestUtils.startServer(loadBalancedAvroHandler2);
            Properties properties = new Properties();
            properties.put("hosts", "h1 h2");
            properties.put("client.type", "default_loadbalance");
            properties.put("hosts.h1", "127.0.0.1:" + server.getPort());
            properties.put("hosts.h2", "127.0.0.1:" + server2.getPort());
            rpcClient = RpcClientFactory.getInstance(properties);
            Assert.assertTrue(rpcClient instanceof LoadBalancingRpcClient);
            for (int i = 0; i < 100; i++) {
                if (i == 20) {
                    loadBalancedAvroHandler2.setFailed();
                } else if (i == 40) {
                    loadBalancedAvroHandler2.setOK();
                }
                rpcClient.append(getEvent(i));
            }
            Assert.assertEquals(60, loadBalancedAvroHandler.getAppendCount());
            Assert.assertEquals(40, loadBalancedAvroHandler2.getAppendCount());
            if (server != null) {
                server.close();
            }
            if (server2 != null) {
                server2.close();
            }
            if (rpcClient != null) {
                rpcClient.close();
            }
        } catch (Throwable th) {
            if (server != null) {
                server.close();
            }
            if (server2 != null) {
                server2.close();
            }
            if (rpcClient != null) {
                rpcClient.close();
            }
            throw th;
        }
    }

    @Test(expected = EventDeliveryException.class)
    public void testTwoHostFailoverThrowAfterClose() throws Exception {
        Server server = null;
        Server server2 = null;
        try {
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler = new RpcTestUtils.LoadBalancedAvroHandler();
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler2 = new RpcTestUtils.LoadBalancedAvroHandler();
            server = RpcTestUtils.startServer(loadBalancedAvroHandler);
            server2 = RpcTestUtils.startServer(loadBalancedAvroHandler2);
            Properties properties = new Properties();
            properties.put("hosts", "h1 h2");
            properties.put("client.type", "default_loadbalance");
            properties.put("hosts.h1", "127.0.0.1:" + server.getPort());
            properties.put("hosts.h2", "127.0.0.1:" + server2.getPort());
            RpcClient rpcClientFactory = RpcClientFactory.getInstance(properties);
            Assert.assertTrue(rpcClientFactory instanceof LoadBalancingRpcClient);
            for (int i = 0; i < 100; i++) {
                if (i == 20) {
                    loadBalancedAvroHandler2.setFailed();
                } else if (i == 40) {
                    loadBalancedAvroHandler2.setOK();
                }
                rpcClientFactory.append(getEvent(i));
            }
            Assert.assertEquals(60, loadBalancedAvroHandler.getAppendCount());
            Assert.assertEquals(40, loadBalancedAvroHandler2.getAppendCount());
            if (rpcClientFactory != null) {
                rpcClientFactory.close();
            }
            rpcClientFactory.append(getEvent(3));
            Assert.fail();
            if (server != null) {
                server.close();
            }
            if (server2 != null) {
                server2.close();
            }
        } catch (Throwable th) {
            if (server != null) {
                server.close();
            }
            if (server2 != null) {
                server2.close();
            }
            throw th;
        }
    }

    @Test
    public void testTwoHostsOneDead() throws Exception {
        LOGGER.info("Running testTwoHostsOneDead...");
        Server server = null;
        RpcClient rpcClient = null;
        RpcClient rpcClient2 = null;
        try {
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler = new RpcTestUtils.LoadBalancedAvroHandler();
            server = RpcTestUtils.startServer(loadBalancedAvroHandler);
            Properties properties = new Properties();
            properties.put("hosts", "h1 h2");
            properties.put("client.type", "default_loadbalance");
            properties.put("hosts.h1", "127.0.0.1:0");
            properties.put("hosts.h2", "127.0.0.1:" + server.getPort());
            rpcClient = RpcClientFactory.getInstance(properties);
            Assert.assertTrue(rpcClient instanceof LoadBalancingRpcClient);
            for (int i = 0; i < 10; i++) {
                rpcClient.appendBatch(getBatchedEvent(i));
            }
            Assert.assertEquals(10, loadBalancedAvroHandler.getAppendBatchCount());
            rpcClient2 = RpcClientFactory.getInstance(properties);
            Assert.assertTrue(rpcClient2 instanceof LoadBalancingRpcClient);
            for (int i2 = 0; i2 < 10; i2++) {
                rpcClient2.append(getEvent(i2));
            }
            Assert.assertEquals(10, loadBalancedAvroHandler.getAppendCount());
            if (server != null) {
                server.close();
            }
            if (rpcClient != null) {
                rpcClient.close();
            }
            if (rpcClient2 != null) {
                rpcClient2.close();
            }
        } catch (Throwable th) {
            if (server != null) {
                server.close();
            }
            if (rpcClient != null) {
                rpcClient.close();
            }
            if (rpcClient2 != null) {
                rpcClient2.close();
            }
            throw th;
        }
    }

    @Test
    public void testTwoHostFailoverBatch() throws Exception {
        Server server = null;
        Server server2 = null;
        RpcClient rpcClient = null;
        try {
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler = new RpcTestUtils.LoadBalancedAvroHandler();
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler2 = new RpcTestUtils.LoadBalancedAvroHandler();
            server = RpcTestUtils.startServer(loadBalancedAvroHandler);
            server2 = RpcTestUtils.startServer(loadBalancedAvroHandler2);
            Properties properties = new Properties();
            properties.put("hosts", "h1 h2");
            properties.put("client.type", "default_loadbalance");
            properties.put("hosts.h1", "127.0.0.1:" + server.getPort());
            properties.put("hosts.h2", "127.0.0.1:" + server2.getPort());
            rpcClient = RpcClientFactory.getInstance(properties);
            Assert.assertTrue(rpcClient instanceof LoadBalancingRpcClient);
            for (int i = 0; i < 100; i++) {
                if (i == 20) {
                    loadBalancedAvroHandler2.setFailed();
                } else if (i == 40) {
                    loadBalancedAvroHandler2.setOK();
                }
                rpcClient.appendBatch(getBatchedEvent(i));
            }
            Assert.assertEquals(60, loadBalancedAvroHandler.getAppendBatchCount());
            Assert.assertEquals(40, loadBalancedAvroHandler2.getAppendBatchCount());
            if (server != null) {
                server.close();
            }
            if (server2 != null) {
                server2.close();
            }
            if (rpcClient != null) {
                rpcClient.close();
            }
        } catch (Throwable th) {
            if (server != null) {
                server.close();
            }
            if (server2 != null) {
                server2.close();
            }
            if (rpcClient != null) {
                rpcClient.close();
            }
            throw th;
        }
    }

    @Test
    public void testLbDefaultClientTwoHosts() throws Exception {
        Server server = null;
        Server server2 = null;
        RpcClient rpcClient = null;
        try {
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler = new RpcTestUtils.LoadBalancedAvroHandler();
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler2 = new RpcTestUtils.LoadBalancedAvroHandler();
            server = RpcTestUtils.startServer(loadBalancedAvroHandler);
            server2 = RpcTestUtils.startServer(loadBalancedAvroHandler2);
            Properties properties = new Properties();
            properties.put("hosts", "h1 h2");
            properties.put("client.type", "default_loadbalance");
            properties.put("hosts.h1", "127.0.0.1:" + server.getPort());
            properties.put("hosts.h2", "127.0.0.1:" + server2.getPort());
            rpcClient = RpcClientFactory.getInstance(properties);
            Assert.assertTrue(rpcClient instanceof LoadBalancingRpcClient);
            for (int i = 0; i < 100; i++) {
                rpcClient.append(getEvent(i));
            }
            Assert.assertEquals(50, loadBalancedAvroHandler.getAppendCount());
            Assert.assertEquals(50, loadBalancedAvroHandler2.getAppendCount());
            if (server != null) {
                server.close();
            }
            if (server2 != null) {
                server2.close();
            }
            if (rpcClient != null) {
                rpcClient.close();
            }
        } catch (Throwable th) {
            if (server != null) {
                server.close();
            }
            if (server2 != null) {
                server2.close();
            }
            if (rpcClient != null) {
                rpcClient.close();
            }
            throw th;
        }
    }

    @Test
    public void testLbDefaultClientTwoHostsBatch() throws Exception {
        Server server = null;
        Server server2 = null;
        RpcClient rpcClient = null;
        try {
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler = new RpcTestUtils.LoadBalancedAvroHandler();
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler2 = new RpcTestUtils.LoadBalancedAvroHandler();
            server = RpcTestUtils.startServer(loadBalancedAvroHandler);
            server2 = RpcTestUtils.startServer(loadBalancedAvroHandler2);
            Properties properties = new Properties();
            properties.put("hosts", "h1 h2");
            properties.put("client.type", "default_loadbalance");
            properties.put("hosts.h1", "127.0.0.1:" + server.getPort());
            properties.put("hosts.h2", "127.0.0.1:" + server2.getPort());
            rpcClient = RpcClientFactory.getInstance(properties);
            Assert.assertTrue(rpcClient instanceof LoadBalancingRpcClient);
            for (int i = 0; i < 100; i++) {
                rpcClient.appendBatch(getBatchedEvent(i));
            }
            Assert.assertEquals(50, loadBalancedAvroHandler.getAppendBatchCount());
            Assert.assertEquals(50, loadBalancedAvroHandler2.getAppendBatchCount());
            if (server != null) {
                server.close();
            }
            if (server2 != null) {
                server2.close();
            }
            if (rpcClient != null) {
                rpcClient.close();
            }
        } catch (Throwable th) {
            if (server != null) {
                server.close();
            }
            if (server2 != null) {
                server2.close();
            }
            if (rpcClient != null) {
                rpcClient.close();
            }
            throw th;
        }
    }

    @Test
    public void testLbClientTenHostRandomDistribution() throws Exception {
        Server[] serverArr = new Server[10];
        RpcTestUtils.LoadBalancedAvroHandler[] loadBalancedAvroHandlerArr = new RpcTestUtils.LoadBalancedAvroHandler[10];
        try {
            Properties properties = new Properties();
            StringBuilder sb = new StringBuilder("");
            for (int i = 0; i < 10; i++) {
                loadBalancedAvroHandlerArr[i] = new RpcTestUtils.LoadBalancedAvroHandler();
                serverArr[i] = RpcTestUtils.startServer(loadBalancedAvroHandlerArr[i]);
                String str = "h" + i;
                properties.put("hosts." + str, "127.0.0.1:" + serverArr[i].getPort());
                sb.append(str).append(" ");
            }
            properties.put("hosts", sb.toString().trim());
            properties.put("client.type", "default_loadbalance");
            properties.put("host-selector", "random");
            RpcClient rpcClientFactory = RpcClientFactory.getInstance(properties);
            Assert.assertTrue(rpcClientFactory instanceof LoadBalancingRpcClient);
            for (int i2 = 0; i2 < 1000; i2++) {
                rpcClientFactory.append(getEvent(i2));
            }
            HashSet hashSet = new HashSet();
            int i3 = 0;
            for (RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler : loadBalancedAvroHandlerArr) {
                i3 += loadBalancedAvroHandler.getAppendCount();
                hashSet.add(Integer.valueOf(loadBalancedAvroHandler.getAppendCount()));
            }
            Assert.assertTrue("Very unusual distribution", hashSet.size() > 2);
            Assert.assertTrue("Missing events", i3 == 1000);
            for (int i4 = 0; i4 < 10; i4++) {
                if (serverArr[i4] != null) {
                    serverArr[i4].close();
                }
            }
        } catch (Throwable th) {
            for (int i5 = 0; i5 < 10; i5++) {
                if (serverArr[i5] != null) {
                    serverArr[i5].close();
                }
            }
            throw th;
        }
    }

    @Test
    public void testLbClientTenHostRandomDistributionBatch() throws Exception {
        Server[] serverArr = new Server[10];
        RpcTestUtils.LoadBalancedAvroHandler[] loadBalancedAvroHandlerArr = new RpcTestUtils.LoadBalancedAvroHandler[10];
        try {
            Properties properties = new Properties();
            StringBuilder sb = new StringBuilder("");
            for (int i = 0; i < 10; i++) {
                loadBalancedAvroHandlerArr[i] = new RpcTestUtils.LoadBalancedAvroHandler();
                serverArr[i] = RpcTestUtils.startServer(loadBalancedAvroHandlerArr[i]);
                String str = "h" + i;
                properties.put("hosts." + str, "127.0.0.1:" + serverArr[i].getPort());
                sb.append(str).append(" ");
            }
            properties.put("hosts", sb.toString().trim());
            properties.put("client.type", "default_loadbalance");
            properties.put("host-selector", "random");
            RpcClient rpcClientFactory = RpcClientFactory.getInstance(properties);
            Assert.assertTrue(rpcClientFactory instanceof LoadBalancingRpcClient);
            for (int i2 = 0; i2 < 1000; i2++) {
                rpcClientFactory.appendBatch(getBatchedEvent(i2));
            }
            HashSet hashSet = new HashSet();
            int i3 = 0;
            for (RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler : loadBalancedAvroHandlerArr) {
                i3 += loadBalancedAvroHandler.getAppendBatchCount();
                hashSet.add(Integer.valueOf(loadBalancedAvroHandler.getAppendBatchCount()));
            }
            Assert.assertTrue("Very unusual distribution", hashSet.size() > 2);
            Assert.assertTrue("Missing events", i3 == 1000);
            for (int i4 = 0; i4 < 10; i4++) {
                if (serverArr[i4] != null) {
                    serverArr[i4].close();
                }
            }
        } catch (Throwable th) {
            for (int i5 = 0; i5 < 10; i5++) {
                if (serverArr[i5] != null) {
                    serverArr[i5].close();
                }
            }
            throw th;
        }
    }

    @Test
    public void testLbClientTenHostRoundRobinDistribution() throws Exception {
        Server[] serverArr = new Server[10];
        RpcTestUtils.LoadBalancedAvroHandler[] loadBalancedAvroHandlerArr = new RpcTestUtils.LoadBalancedAvroHandler[10];
        try {
            Properties properties = new Properties();
            StringBuilder sb = new StringBuilder("");
            for (int i = 0; i < 10; i++) {
                loadBalancedAvroHandlerArr[i] = new RpcTestUtils.LoadBalancedAvroHandler();
                serverArr[i] = RpcTestUtils.startServer(loadBalancedAvroHandlerArr[i]);
                String str = "h" + i;
                properties.put("hosts." + str, "127.0.0.1:" + serverArr[i].getPort());
                sb.append(str).append(" ");
            }
            properties.put("hosts", sb.toString().trim());
            properties.put("client.type", "default_loadbalance");
            properties.put("host-selector", "round_robin");
            RpcClient rpcClientFactory = RpcClientFactory.getInstance(properties);
            Assert.assertTrue(rpcClientFactory instanceof LoadBalancingRpcClient);
            for (int i2 = 0; i2 < 1000; i2++) {
                rpcClientFactory.append(getEvent(i2));
            }
            HashSet hashSet = new HashSet();
            int i3 = 0;
            for (RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler : loadBalancedAvroHandlerArr) {
                i3 += loadBalancedAvroHandler.getAppendCount();
                hashSet.add(Integer.valueOf(loadBalancedAvroHandler.getAppendCount()));
            }
            Assert.assertTrue("Very unusual distribution", hashSet.size() == 1);
            Assert.assertTrue("Missing events", i3 == 1000);
            for (int i4 = 0; i4 < 10; i4++) {
                if (serverArr[i4] != null) {
                    serverArr[i4].close();
                }
            }
        } catch (Throwable th) {
            for (int i5 = 0; i5 < 10; i5++) {
                if (serverArr[i5] != null) {
                    serverArr[i5].close();
                }
            }
            throw th;
        }
    }

    @Test
    public void testLbClientTenHostRoundRobinDistributionBatch() throws Exception {
        Server[] serverArr = new Server[10];
        RpcTestUtils.LoadBalancedAvroHandler[] loadBalancedAvroHandlerArr = new RpcTestUtils.LoadBalancedAvroHandler[10];
        try {
            Properties properties = new Properties();
            StringBuilder sb = new StringBuilder("");
            for (int i = 0; i < 10; i++) {
                loadBalancedAvroHandlerArr[i] = new RpcTestUtils.LoadBalancedAvroHandler();
                serverArr[i] = RpcTestUtils.startServer(loadBalancedAvroHandlerArr[i]);
                String str = "h" + i;
                properties.put("hosts." + str, "127.0.0.1:" + serverArr[i].getPort());
                sb.append(str).append(" ");
            }
            properties.put("hosts", sb.toString().trim());
            properties.put("client.type", "default_loadbalance");
            properties.put("host-selector", "round_robin");
            RpcClient rpcClientFactory = RpcClientFactory.getInstance(properties);
            Assert.assertTrue(rpcClientFactory instanceof LoadBalancingRpcClient);
            for (int i2 = 0; i2 < 1000; i2++) {
                rpcClientFactory.appendBatch(getBatchedEvent(i2));
            }
            HashSet hashSet = new HashSet();
            int i3 = 0;
            for (RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler : loadBalancedAvroHandlerArr) {
                i3 += loadBalancedAvroHandler.getAppendBatchCount();
                hashSet.add(Integer.valueOf(loadBalancedAvroHandler.getAppendBatchCount()));
            }
            Assert.assertTrue("Very unusual distribution", hashSet.size() == 1);
            Assert.assertTrue("Missing events", i3 == 1000);
            for (int i4 = 0; i4 < 10; i4++) {
                if (serverArr[i4] != null) {
                    serverArr[i4].close();
                }
            }
        } catch (Throwable th) {
            for (int i5 = 0; i5 < 10; i5++) {
                if (serverArr[i5] != null) {
                    serverArr[i5].close();
                }
            }
            throw th;
        }
    }

    @Test
    public void testRandomBackoff() throws Exception {
        Properties properties = new Properties();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        StringBuilder sb = new StringBuilder("");
        for (int i = 0; i < 3; i++) {
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler = new RpcTestUtils.LoadBalancedAvroHandler();
            arrayList.add(loadBalancedAvroHandler);
            Server startServer = RpcTestUtils.startServer(loadBalancedAvroHandler);
            arrayList2.add(startServer);
            String str = "h" + i;
            properties.put("hosts." + str, "127.0.0.1:" + startServer.getPort());
            sb.append(str).append(" ");
        }
        properties.put("hosts", sb.toString().trim());
        properties.put("client.type", "default_loadbalance");
        properties.put("host-selector", "random");
        properties.put("backoff", "true");
        ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(0)).setFailed();
        ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(2)).setFailed();
        RpcClient rpcClientFactory = RpcClientFactory.getInstance(properties);
        Assert.assertTrue(rpcClientFactory instanceof LoadBalancingRpcClient);
        for (int i2 = 0; i2 < 50; i2++) {
            rpcClientFactory.append(EventBuilder.withBody(("test" + String.valueOf(i2)).getBytes()));
        }
        Assert.assertEquals(50, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).getAppendCount());
        Assert.assertEquals(0, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(0)).getAppendCount());
        Assert.assertEquals(0, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(2)).getAppendCount());
        ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(0)).setOK();
        ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).setFailed();
        try {
            rpcClientFactory.append(EventBuilder.withBody("shouldfail".getBytes()));
            Assert.fail("Expected EventDeliveryException");
        } catch (EventDeliveryException e) {
        }
        Thread.sleep(2500L);
        for (int i3 = 0; i3 < 50; i3++) {
            rpcClientFactory.append(EventBuilder.withBody(("test" + String.valueOf(i3)).getBytes()));
        }
        Assert.assertEquals(50, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(0)).getAppendCount());
        Assert.assertEquals(50, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).getAppendCount());
        Assert.assertEquals(0, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(2)).getAppendCount());
    }

    @Test
    public void testRoundRobinBackoffInitialFailure() throws EventDeliveryException {
        Properties properties = new Properties();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        StringBuilder sb = new StringBuilder("");
        for (int i = 0; i < 3; i++) {
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler = new RpcTestUtils.LoadBalancedAvroHandler();
            arrayList.add(loadBalancedAvroHandler);
            Server startServer = RpcTestUtils.startServer(loadBalancedAvroHandler);
            arrayList2.add(startServer);
            String str = "h" + i;
            properties.put("hosts." + str, "127.0.0.1:" + startServer.getPort());
            sb.append(str).append(" ");
        }
        properties.put("hosts", sb.toString().trim());
        properties.put("client.type", "default_loadbalance");
        properties.put("host-selector", "round_robin");
        properties.put("backoff", "true");
        RpcClient rpcClientFactory = RpcClientFactory.getInstance(properties);
        Assert.assertTrue(rpcClientFactory instanceof LoadBalancingRpcClient);
        for (int i2 = 0; i2 < 3; i2++) {
            rpcClientFactory.append(EventBuilder.withBody("testing".getBytes()));
        }
        ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).setFailed();
        for (int i3 = 0; i3 < 3; i3++) {
            rpcClientFactory.append(EventBuilder.withBody("testing".getBytes()));
        }
        ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).setOK();
        for (int i4 = 0; i4 < 3; i4++) {
            rpcClientFactory.append(EventBuilder.withBody("testing".getBytes()));
        }
        Assert.assertEquals(4, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(0)).getAppendCount());
        Assert.assertEquals(1, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).getAppendCount());
        Assert.assertEquals(4, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(2)).getAppendCount());
    }

    @Test
    public void testRoundRobinBackoffIncreasingBackoffs() throws Exception {
        Properties properties = new Properties();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        StringBuilder sb = new StringBuilder("");
        for (int i = 0; i < 3; i++) {
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler = new RpcTestUtils.LoadBalancedAvroHandler();
            arrayList.add(loadBalancedAvroHandler);
            if (i == 1) {
                loadBalancedAvroHandler.setFailed();
            }
            Server startServer = RpcTestUtils.startServer(loadBalancedAvroHandler);
            arrayList2.add(startServer);
            String str = "h" + i;
            properties.put("hosts." + str, "127.0.0.1:" + startServer.getPort());
            sb.append(str).append(" ");
        }
        properties.put("hosts", sb.toString().trim());
        properties.put("client.type", "default_loadbalance");
        properties.put("host-selector", "round_robin");
        properties.put("backoff", "true");
        RpcClient rpcClientFactory = RpcClientFactory.getInstance(properties);
        Assert.assertTrue(rpcClientFactory instanceof LoadBalancingRpcClient);
        for (int i2 = 0; i2 < 3; i2++) {
            rpcClientFactory.append(EventBuilder.withBody("testing".getBytes()));
        }
        Assert.assertEquals(0, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).getAppendCount());
        Thread.sleep(2100L);
        for (int i3 = 0; i3 < 3; i3++) {
            rpcClientFactory.append(EventBuilder.withBody("testing".getBytes()));
        }
        Assert.assertEquals(0, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).getAppendCount());
        ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).setOK();
        Thread.sleep(2100L);
        for (int i4 = 0; i4 < 3; i4++) {
            rpcClientFactory.append(EventBuilder.withBody("testing".getBytes()));
        }
        Assert.assertEquals(0, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).getAppendCount());
        Thread.sleep(2500L);
        for (int i5 = 0; i5 < 60; i5++) {
            rpcClientFactory.append(EventBuilder.withBody("testing".getBytes()));
        }
        Assert.assertEquals(5 + (60 / 3), ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(0)).getAppendCount());
        Assert.assertEquals(60 / 3, ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).getAppendCount());
        Assert.assertEquals(4 + (60 / 3), ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(2)).getAppendCount());
    }

    @Test
    public void testRoundRobinBackoffFailureRecovery() throws EventDeliveryException, InterruptedException {
        Properties properties = new Properties();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        StringBuilder sb = new StringBuilder("");
        for (int i = 0; i < 3; i++) {
            RpcTestUtils.LoadBalancedAvroHandler loadBalancedAvroHandler = new RpcTestUtils.LoadBalancedAvroHandler();
            arrayList.add(loadBalancedAvroHandler);
            if (i == 1) {
                loadBalancedAvroHandler.setFailed();
            }
            Server startServer = RpcTestUtils.startServer(loadBalancedAvroHandler);
            arrayList2.add(startServer);
            String str = "h" + i;
            properties.put("hosts." + str, "127.0.0.1:" + startServer.getPort());
            sb.append(str).append(" ");
        }
        properties.put("hosts", sb.toString().trim());
        properties.put("client.type", "default_loadbalance");
        properties.put("host-selector", "round_robin");
        properties.put("backoff", "true");
        RpcClient rpcClientFactory = RpcClientFactory.getInstance(properties);
        Assert.assertTrue(rpcClientFactory instanceof LoadBalancingRpcClient);
        for (int i2 = 0; i2 < 3; i2++) {
            rpcClientFactory.append(EventBuilder.withBody("recovery test".getBytes()));
        }
        ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).setOK();
        Thread.sleep(3000L);
        for (int i3 = 0; i3 < 60; i3++) {
            rpcClientFactory.append(EventBuilder.withBody("testing".getBytes()));
        }
        Assert.assertEquals(2 + (60 / 3), ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(0)).getAppendCount());
        Assert.assertEquals(0 + (60 / 3), ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(1)).getAppendCount());
        Assert.assertEquals(1 + (60 / 3), ((RpcTestUtils.LoadBalancedAvroHandler) arrayList.get(2)).getAppendCount());
    }

    private List<Event> getBatchedEvent(int i) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(getEvent(i));
        return arrayList;
    }

    private Event getEvent(int i) {
        return EventBuilder.withBody(("event: " + i).getBytes());
    }
}
