package org.apache.twill.yarn;

import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.twill.api.Hosts;
import org.apache.twill.api.Racks;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.internal.yarn.YarnUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/twill/yarn/PlacementPolicyTestRun.class */
public class PlacementPolicyTestRun extends BaseYarnTest {
    private static final Logger LOG = LoggerFactory.getLogger(PlacementPolicyTestRun.class);
    private static final int RUNNABLE_MEMORY = 512;
    private static final int RUNNABLE_CORES = 1;
    private static List<NodeReport> nodeReports;
    private static ResourceSpecification resource;
    private static ResourceSpecification twoInstancesResource;

    /* loaded from: input_file:org/apache/twill/yarn/PlacementPolicyTestRun$BadApplication.class */
    public static final class BadApplication implements TwillApplication {
        public TwillSpecification configure() {
            return TwillSpecification.Builder.with().setName("BadApplication").withRunnable().add("Hermione", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().add("Harry", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().add("Ron", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().withPlacementPolicy().add(TwillSpecification.PlacementPolicy.Type.DEFAULT, "Hermione", new String[]{"Harry"}).add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Hermione", new String[]{"Ron"}).anyOrder().build();
        }
    }

    /* loaded from: input_file:org/apache/twill/yarn/PlacementPolicyTestRun$ChangeInstanceApplication.class */
    public static final class ChangeInstanceApplication implements TwillApplication {
        public TwillSpecification configure() {
            return TwillSpecification.Builder.with().setName("DistributedApplication").withRunnable().add("Alice", new EchoServer(), PlacementPolicyTestRun.twoInstancesResource).noLocalFiles().add("Bob", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().add("Eve", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().withPlacementPolicy().add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Alice", new String[]{"Bob"}).anyOrder().build();
        }
    }

    /* loaded from: input_file:org/apache/twill/yarn/PlacementPolicyTestRun$DistributedApplication.class */
    public static final class DistributedApplication implements TwillApplication {
        public TwillSpecification configure() {
            return TwillSpecification.Builder.with().setName("DistributedApplication").withRunnable().add("Alice", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().add("Bob", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().add("Eve", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().withPlacementPolicy().add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Alice", new String[]{"Bob"}).anyOrder().build();
        }
    }

    /* loaded from: input_file:org/apache/twill/yarn/PlacementPolicyTestRun$FaultyApplication.class */
    public static final class FaultyApplication implements TwillApplication {
        public TwillSpecification configure() {
            return TwillSpecification.Builder.with().setName("FaultyApplication").withRunnable().add("Hermione", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().add("Harry", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().add("Ron", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().withPlacementPolicy().add(TwillSpecification.PlacementPolicy.Type.DEFAULT, "Hermione", new String[]{"Ron"}).add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Draco", new String[]{"Harry"}).anyOrder().build();
        }
    }

    /* loaded from: input_file:org/apache/twill/yarn/PlacementPolicyTestRun$PlacementPolicyApplication.class */
    public static final class PlacementPolicyApplication implements TwillApplication {
        public TwillSpecification configure() {
            return TwillSpecification.Builder.with().setName("PlacementPolicyApplication").withRunnable().add("hostRunnable", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().add("hostRackRunnable", new EchoServer(), PlacementPolicyTestRun.resource).noLocalFiles().add("distributedRunnable", new EchoServer(), PlacementPolicyTestRun.twoInstancesResource).noLocalFiles().withPlacementPolicy().add(Hosts.of(((NodeReport) PlacementPolicyTestRun.nodeReports.get(0)).getHttpAddress(), new String[0]), "hostRunnable", new String[0]).add(Hosts.of(((NodeReport) PlacementPolicyTestRun.nodeReports.get(PlacementPolicyTestRun.RUNNABLE_CORES)).getHttpAddress(), new String[0]), Racks.of("/default-rack", new String[0]), "hostRackRunnable", new String[0]).add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "distributedRunnable", new String[0]).anyOrder().build();
        }
    }

    @BeforeClass
    public static void verifyClusterCapability() throws InterruptedException {
        Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
        resource = ResourceSpecification.Builder.with().setVirtualCores(RUNNABLE_CORES).setMemory(RUNNABLE_MEMORY, ResourceSpecification.SizeUnit.MEGA).build();
        twoInstancesResource = ResourceSpecification.Builder.with().setVirtualCores(RUNNABLE_CORES).setMemory(RUNNABLE_MEMORY, ResourceSpecification.SizeUnit.MEGA).setInstances(2).build();
        int i = 0;
        while (true) {
            int i2 = i;
            i += RUNNABLE_CORES;
            if (i2 >= 20) {
                break;
            }
            try {
                nodeReports = TWILL_TESTER.getNodeReports();
                if (nodeReports != null && nodeReports.size() == 3) {
                    break;
                }
            } catch (Exception e) {
                LOG.error("Failed to get node reports", e);
            }
            LOG.warn("NodeManagers != 3. {}", nodeReports);
            TimeUnit.SECONDS.sleep(1L);
        }
        for (NodeReport nodeReport : nodeReports) {
            Resource capability = nodeReport.getCapability();
            Resource used = nodeReport.getUsed();
            Assert.assertNotNull(capability);
            if (used != null) {
                Assert.assertTrue(2 * resource.getMemorySize() < capability.getMemory() - used.getMemory());
            } else {
                Assert.assertTrue(2 * resource.getMemorySize() < capability.getMemory());
            }
        }
    }

    @Test
    public void testPlacementPolicy() throws Exception {
        Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
        waitNodeManagerCount(0, 10L, TimeUnit.SECONDS);
        TwillController start = getTwillRunner().prepare(new PlacementPolicyApplication()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).withApplicationArguments(new String[]{"PlacementPolicyTest"}).withArguments("hostRunnable", new String[]{"host"}).withArguments("hostRackRunnable", new String[]{"hostRack"}).withArguments("distributedRunnable", new String[]{"distributed"}).start();
        try {
            Assert.assertTrue(waitForSize(start.discoverService("PlacementPolicyTest"), 4, 80));
            Assert.assertTrue(getProvisionedNodeManagerCount() >= 2);
            start.terminate().get(120L, TimeUnit.SECONDS);
            TimeUnit.SECONDS.sleep(2L);
        } catch (Throwable th) {
            start.terminate().get(120L, TimeUnit.SECONDS);
            throw th;
        }
    }

    @Test
    public void testDistributedPlacementPolicy() throws Exception {
        Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
        waitNodeManagerCount(0, 10L, TimeUnit.SECONDS);
        TwillController start = getTwillRunner().prepare(new DistributedApplication()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).withApplicationArguments(new String[]{"DistributedTest"}).withArguments("Alice", new String[]{"alice"}).withArguments("Bob", new String[]{"bob"}).withArguments("Eve", new String[]{"eve"}).start();
        try {
            Assert.assertTrue(waitForSize(start.discoverService("DistributedTest"), 3, 60));
            Assert.assertTrue(getProvisionedNodeManagerCount() >= 2);
            start.changeInstances("Alice", 2).get(60L, TimeUnit.SECONDS);
            Assert.assertTrue(waitForSize(start.discoverService("DistributedTest"), 4, 60));
            Assert.assertTrue(getProvisionedNodeManagerCount() >= 3);
            start.changeInstances("Eve", 2).get(60L, TimeUnit.SECONDS);
            Assert.assertTrue(waitForSize(start.discoverService("DistributedTest"), 5, 60));
            start.changeInstances("Bob", 2).get(60L, TimeUnit.SECONDS);
            Assert.assertTrue(waitForSize(start.discoverService("DistributedTest"), 6, 60));
            Assert.assertTrue(getProvisionedNodeManagerCount() >= 3);
            start.terminate().get(120L, TimeUnit.SECONDS);
            TimeUnit.SECONDS.sleep(2L);
        } catch (Throwable th) {
            start.terminate().get(120L, TimeUnit.SECONDS);
            throw th;
        }
    }

    private void waitNodeManagerCount(int i, long j, TimeUnit timeUnit) throws Exception {
        int provisionedNodeManagerCount = getProvisionedNodeManagerCount();
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (provisionedNodeManagerCount == i || j3 >= timeUnit.toMillis(j)) {
                break;
            }
            LOG.info("Waiting for expected number of node managers. Expected: {}. Actual: {}", Integer.valueOf(i), Integer.valueOf(provisionedNodeManagerCount));
            TimeUnit.SECONDS.sleep(1L);
            provisionedNodeManagerCount = getProvisionedNodeManagerCount();
            j2 = System.currentTimeMillis() - currentTimeMillis;
        }
        if (provisionedNodeManagerCount != i) {
            throw new TimeoutException("Failed to get expected number of node managers. Expected: " + i + ". Actual: " + provisionedNodeManagerCount);
        }
    }

    @Test
    public void testChangeInstance() throws InterruptedException, TimeoutException, ExecutionException {
        Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
        TwillController start = getTwillRunner().prepare(new ChangeInstanceApplication()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).withApplicationArguments(new String[]{"DistributedTest"}).withArguments("Alice", new String[]{"alice"}).withArguments("Bob", new String[]{"bob"}).withArguments("Eve", new String[]{"eve"}).start();
        try {
            Assert.assertTrue(waitForSize(start.discoverService("DistributedTest"), 4, 60));
            start.changeInstances("Alice", 4).get(60L, TimeUnit.SECONDS);
            Assert.assertTrue(waitForSize(start.discoverService("DistributedTest"), 6, 60));
            start.changeInstances("Alice", RUNNABLE_CORES).get(60L, TimeUnit.SECONDS);
            Assert.assertTrue(waitForSize(start.discoverService("DistributedTest"), 3, 60));
            start.changeInstances("Bob", 3).get(60L, TimeUnit.SECONDS);
            Assert.assertTrue(waitForSize(start.discoverService("DistributedTest"), 5, 60));
            start.changeInstances("Eve", 3).get(60L, TimeUnit.SECONDS);
            Assert.assertTrue(waitForSize(start.discoverService("DistributedTest"), 7, 60));
            start.terminate().get(120L, TimeUnit.SECONDS);
            TimeUnit.SECONDS.sleep(2L);
        } catch (Throwable th) {
            start.terminate().get(120L, TimeUnit.SECONDS);
            throw th;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testNonExistentRunnable() throws InterruptedException, ExecutionException, TimeoutException {
        getTwillRunner().prepare(new FaultyApplication()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).start().terminate().get(120L, TimeUnit.SECONDS);
    }

    @Test(expected = IllegalArgumentException.class)
    public void testPlacementPolicySpecification() throws InterruptedException, ExecutionException, TimeoutException {
        getTwillRunner().prepare(new BadApplication()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).start().terminate().get(120L, TimeUnit.SECONDS);
    }

    private int getProvisionedNodeManagerCount() throws Exception {
        int i = 0;
        Iterator<NodeReport> it = getNodeReports().iterator();
        while (it.hasNext()) {
            Resource used = it.next().getUsed();
            if (used != null && used.getMemory() > 0) {
                i += RUNNABLE_CORES;
            }
        }
        return i;
    }
}
