package org.apache.twill.yarn;

import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.io.LineReader;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.Socket;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.ServiceDiscovered;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/twill/yarn/ResourceReportTestRun.class */
public final class ResourceReportTestRun extends BaseYarnTest {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceReportTestRun.class);

    /* loaded from: input_file:org/apache/twill/yarn/ResourceReportTestRun$ResourceApplication.class */
    private class ResourceApplication implements TwillApplication {
        private ResourceApplication() {
        }

        public TwillSpecification configure() {
            return TwillSpecification.Builder.with().setName("ResourceApplication").withRunnable().add("echo1", new EchoServer(), ResourceSpecification.Builder.with().setVirtualCores(1).setMemory(256, ResourceSpecification.SizeUnit.MEGA).setInstances(2).build()).noLocalFiles().add("echo2", new EchoServer(), ResourceSpecification.Builder.with().setVirtualCores(2).setMemory(512, ResourceSpecification.SizeUnit.MEGA).setInstances(1).build()).noLocalFiles().anyOrder().build();
        }
    }

    @Test
    public void testRunnablesGetAllowedResourcesInEnv() throws InterruptedException, IOException, TimeoutException, ExecutionException {
        TwillController start = getTwillRunner().prepare(new EnvironmentEchoServer(), ResourceSpecification.Builder.with().setVirtualCores(1).setMemory(2048, ResourceSpecification.SizeUnit.MEGA).setInstances(1).build()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).withApplicationArguments(new String[]{"envecho"}).withArguments("EnvironmentEchoServer", new String[]{"echo2"}).start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        start.onRunning(new Runnable() { // from class: org.apache.twill.yarn.ResourceReportTestRun.1
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        Assert.assertTrue(countDownLatch.await(120L, TimeUnit.SECONDS));
        ServiceDiscovered discoverService = start.discoverService("envecho");
        Assert.assertTrue(waitForSize(discoverService, 1, 120));
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("YARN_CONTAINER_MEMORY_MB", "2048");
        newHashMap.put("TWILL_INSTANCE_COUNT", "1");
        Discoverable discoverable = (Discoverable) discoverService.iterator().next();
        for (Map.Entry entry : newHashMap.entrySet()) {
            Socket socket = new Socket(discoverable.getSocketAddress().getHostName(), discoverable.getSocketAddress().getPort());
            Throwable th = null;
            try {
                try {
                    PrintWriter printWriter = new PrintWriter((Writer) new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
                    LineReader lineReader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
                    printWriter.println((String) entry.getKey());
                    Assert.assertEquals(entry.getValue(), lineReader.readLine());
                    if (socket != null) {
                        if (0 != 0) {
                            try {
                                socket.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            socket.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (socket != null) {
                    if (th != null) {
                        try {
                            socket.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        socket.close();
                    }
                }
                throw th3;
            }
        }
        start.terminate().get(120L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
    }

    @Test
    public void testResourceReportWithFailingContainers() throws InterruptedException, IOException, TimeoutException, ExecutionException {
        TwillController start = getTwillRunner().prepare(new BuggyServer(), ResourceSpecification.Builder.with().setVirtualCores(1).setMemory(256, ResourceSpecification.SizeUnit.MEGA).setInstances(2).build()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).withApplicationArguments(new String[]{"echo"}).withArguments("BuggyServer", new String[]{"echo2"}).start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        start.onRunning(new Runnable() { // from class: org.apache.twill.yarn.ResourceReportTestRun.2
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        Assert.assertTrue(countDownLatch.await(120L, TimeUnit.SECONDS));
        ServiceDiscovered discoverService = start.discoverService("echo");
        Assert.assertTrue(waitForSize(discoverService, 2, 120));
        Assert.assertEquals(2L, getResourceReport(start, 10000L).getRunnableResources("BuggyServer").size());
        Discoverable discoverable = (Discoverable) discoverService.iterator().next();
        Socket socket = new Socket(discoverable.getSocketAddress().getAddress(), discoverable.getSocketAddress().getPort());
        Throwable th = null;
        try {
            new PrintWriter((Writer) new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true).println("0");
            if (socket != null) {
                if (0 != 0) {
                    try {
                        socket.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    socket.close();
                }
            }
            int i = 0;
            while (i < 100 && getResourceReport(start, 10000L).getRunnableResources("BuggyServer").size() != 1) {
                LOG.info("Wait for BuggyServer to have 1 instance left. Trial {}.", Integer.valueOf(i));
                i++;
                TimeUnit.SECONDS.sleep(1L);
            }
            Assert.assertTrue("Still has 2 contains running after 100 seconds", i < 100);
            start.terminate().get(100L, TimeUnit.SECONDS);
            TimeUnit.SECONDS.sleep(2L);
        } catch (Throwable th3) {
            if (socket != null) {
                if (0 != 0) {
                    try {
                        socket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    socket.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testResourceReport() throws InterruptedException, ExecutionException, IOException, URISyntaxException, TimeoutException {
        final TwillController start = getTwillRunner().prepare(new ResourceApplication()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).withApplicationArguments(new String[]{"echo"}).withArguments("echo1", new String[]{"echo1"}).withArguments("echo2", new String[]{"echo2"}).start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        start.onRunning(new Runnable() { // from class: org.apache.twill.yarn.ResourceReportTestRun.3
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        Assert.assertTrue(countDownLatch.await(120L, TimeUnit.SECONDS));
        Assert.assertTrue(waitForSize(start.discoverService("echo"), 3, 120));
        Map resources = getResourceReport(start, 10000L).getResources();
        Assert.assertEquals(2L, resources.keySet().size());
        Assert.assertTrue(resources.containsKey("echo1"));
        Assert.assertTrue(resources.containsKey("echo2"));
        waitForSize(new Iterable<String>() { // from class: org.apache.twill.yarn.ResourceReportTestRun.4
            @Override // java.lang.Iterable
            public Iterator<String> iterator() {
                return ResourceReportTestRun.this.getResourceReport(start, 10000L).getServices().iterator();
            }
        }, 3, 120);
        Assert.assertEquals(ImmutableSet.of("echo", "echo1", "echo2"), ImmutableSet.copyOf(getResourceReport(start, 10000L).getServices()));
        Collection collection = (Collection) resources.get("echo1");
        Assert.assertEquals(2L, collection.size());
        Iterator it = collection.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(256L, ((TwillRunResources) it.next()).getMemoryMB());
        }
        Collection collection2 = (Collection) resources.get("echo2");
        Assert.assertEquals(1L, collection2.size());
        Iterator it2 = collection2.iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(512L, ((TwillRunResources) it2.next()).getMemoryMB());
        }
        start.changeInstances("echo1", 1).get(60L, TimeUnit.SECONDS);
        Assert.assertTrue(waitForSize(start.discoverService("echo1"), 1, 60));
        ResourceReport resourceReport = getResourceReport(start, 10000L);
        Map resources2 = resourceReport.getResources();
        Assert.assertEquals(2L, resources2.keySet().size());
        Assert.assertTrue(resources2.containsKey("echo1"));
        Assert.assertTrue(resources2.containsKey("echo2"));
        Collection collection3 = (Collection) resources2.get("echo1");
        Assert.assertEquals(1L, collection3.size());
        Iterator it3 = collection3.iterator();
        while (it3.hasNext()) {
            Assert.assertEquals(256L, ((TwillRunResources) it3.next()).getMemoryMB());
        }
        Collection collection4 = (Collection) resources2.get("echo2");
        Assert.assertEquals(1L, collection4.size());
        Iterator it4 = collection4.iterator();
        while (it4.hasNext()) {
            Assert.assertEquals(512L, ((TwillRunResources) it4.next()).getMemoryMB());
        }
        TwillRunnerService createTwillRunnerService = TWILL_TESTER.createTwillRunnerService();
        createTwillRunnerService.start();
        try {
            TwillController lookup = createTwillRunnerService.lookup("ResourceApplication", start.getRunId());
            int i = 60;
            while (lookup == null) {
                int i2 = i;
                i--;
                if (i2 <= 0) {
                    break;
                }
                TimeUnit.SECONDS.sleep(1L);
                lookup = createTwillRunnerService.lookup("ResourceApplication", start.getRunId());
            }
            Assert.assertNotNull(lookup);
            Assert.assertEquals(resourceReport.getResources(), getResourceReport(lookup, 10000L).getResources());
            createTwillRunnerService.stop();
            start.terminate().get(120L, TimeUnit.SECONDS);
            TimeUnit.SECONDS.sleep(2L);
        } catch (Throwable th) {
            createTwillRunnerService.stop();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ResourceReport getResourceReport(TwillController twillController, long j) {
        ResourceReport resourceReport = twillController.getResourceReport();
        Stopwatch stopwatch = new Stopwatch();
        while (resourceReport == null && stopwatch.elapsedMillis() < j) {
            Uninterruptibles.sleepUninterruptibly(200L, TimeUnit.MILLISECONDS);
            resourceReport = twillController.getResourceReport();
        }
        Assert.assertNotNull(resourceReport);
        return resourceReport;
    }
}
