package org.apache.twill.yarn;

import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.google.common.io.LineReader;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.ServiceDiscovered;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.zookeeper.data.Stat;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/twill/yarn/EchoServerTestRun.class */
public final class EchoServerTestRun extends BaseYarnTest {
    private static final Logger LOG = LoggerFactory.getLogger(EchoServerTestRun.class);

    @Test
    public void testEchoServer() throws Exception {
        TwillRunner twillRunner = getTwillRunner();
        TwillController start = twillRunner.prepare(new EchoServer(), ResourceSpecification.Builder.with().setVirtualCores(1).setMemory(1, ResourceSpecification.SizeUnit.GIGA).setInstances(2).build()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).withApplicationArguments(new String[]{"echo"}).withArguments("EchoServer", new String[]{"echo2"}).start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        start.onRunning(new Runnable() { // from class: org.apache.twill.yarn.EchoServerTestRun.1
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        Assert.assertTrue(countDownLatch.await(120L, TimeUnit.SECONDS));
        ServiceDiscovered<Discoverable> discoverService = start.discoverService("echo");
        Assert.assertTrue(waitForSize(discoverService, 2, 120));
        for (Discoverable discoverable : discoverService) {
            String str = "Hello: " + discoverable.getSocketAddress();
            Socket socket = new Socket(discoverable.getSocketAddress().getAddress(), discoverable.getSocketAddress().getPort());
            Throwable th = null;
            try {
                try {
                    PrintWriter printWriter = new PrintWriter((Writer) new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
                    LineReader lineReader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
                    printWriter.println(str);
                    Assert.assertEquals(str, lineReader.readLine());
                    if (socket != null) {
                        if (0 != 0) {
                            try {
                                socket.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            socket.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (socket != null) {
                    if (th != null) {
                        try {
                            socket.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        socket.close();
                    }
                }
                throw th3;
            }
        }
        start.changeInstances("EchoServer", 3).get(60L, TimeUnit.SECONDS);
        Assert.assertTrue(waitForSize(discoverService, 3, 120));
        ServiceDiscovered discoverService2 = start.discoverService("echo2");
        start.changeInstances("EchoServer", 1).get(60L, TimeUnit.SECONDS);
        Assert.assertTrue(waitForSize(discoverService2, 1, 120));
        start.changeInstances("EchoServer", 2).get(60L, TimeUnit.SECONDS);
        Assert.assertTrue(waitForSize(discoverService2, 2, 120));
        HashMap newHashMap = Maps.newHashMap();
        ResourceReport waitForAfterRestartResourceReport = waitForAfterRestartResourceReport(start, "EchoServer", 15L, TimeUnit.MINUTES, 2, null);
        Assert.assertTrue(waitForAfterRestartResourceReport != null);
        for (TwillRunResources twillRunResources : waitForAfterRestartResourceReport.getRunnableResources("EchoServer")) {
            newHashMap.put(Integer.valueOf(twillRunResources.getInstanceId()), twillRunResources.getContainerId());
        }
        start.restartAllInstances("EchoServer").get(60L, TimeUnit.SECONDS);
        Assert.assertTrue(waitForSize(discoverService2, 2, 120));
        Assert.assertTrue(waitForAfterRestartResourceReport(start, "EchoServer", 15L, TimeUnit.MINUTES, 2, newHashMap) != null);
        Iterable lookupLive = twillRunner.lookupLive();
        Assert.assertTrue(waitForSize(lookupLive, 1, 120));
        TwillRunnerService createTwillRunnerService = TWILL_TESTER.createTwillRunnerService();
        createTwillRunnerService.start();
        try {
            Iterable<TwillController> lookup = createTwillRunnerService.lookup("EchoServer");
            Assert.assertTrue(waitForSize(lookup, 1, 120));
            for (TwillController twillController : lookup) {
                LOG.info("Stopping application: " + twillController.getRunId());
                twillController.terminate().get(30L, TimeUnit.SECONDS);
            }
            Assert.assertTrue(waitForSize(lookupLive, 0, 120));
            createTwillRunnerService.stop();
            TimeUnit.SECONDS.sleep(2L);
        } catch (Throwable th5) {
            createTwillRunnerService.stop();
            throw th5;
        }
    }

    @Test
    public void testZKCleanup() throws Exception {
        final ZKClientService build = ZKClientService.Builder.of(getZKConnectionString() + "/twill").build();
        build.startAndWait();
        try {
            TwillRunner twillRunner = getTwillRunner();
            TwillController start = twillRunner.prepare(new EchoServer()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).withApplicationArguments(new String[]{"echo"}).withArguments("EchoServer", new String[]{"echo2"}).start();
            Assert.assertTrue(waitForSize(start.discoverService("echo"), 1, 120));
            start.terminate().get();
            waitFor(null, new Callable<Stat>() { // from class: org.apache.twill.yarn.EchoServerTestRun.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Stat call() throws Exception {
                    return (Stat) build.exists("/EchoServer").get();
                }
            }, 10000L, 100L, TimeUnit.MILLISECONDS);
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 2; i++) {
                arrayList.add(twillRunner.prepare(new EchoServer()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).withApplicationArguments(new String[]{"echo"}).withArguments("EchoServer", new String[]{"echo2"}).start());
            }
            Assert.assertTrue(waitForSize(((TwillController) arrayList.get(1)).discoverService("echo"), 2, 120));
            ((TwillController) arrayList.get(0)).terminate().get();
            Assert.assertNotNull(build.exists("/EchoServer").get());
            Assert.assertTrue(waitForSize(((TwillController) arrayList.get(1)).discoverService("echo"), 1, 120));
            ((TwillController) arrayList.get(1)).terminate().get();
            waitFor(null, new Callable<Stat>() { // from class: org.apache.twill.yarn.EchoServerTestRun.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Stat call() throws Exception {
                    return (Stat) build.exists("/EchoServer").get();
                }
            }, 10000L, 100L, TimeUnit.MILLISECONDS);
            build.stopAndWait();
        } catch (Throwable th) {
            build.stopAndWait();
            throw th;
        }
    }

    @Nullable
    private ResourceReport waitForAfterRestartResourceReport(TwillController twillController, String str, long j, TimeUnit timeUnit, int i, @Nullable Map<Integer, String> map) {
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.start();
        do {
            ResourceReport resourceReport = twillController.getResourceReport();
            if (resourceReport == null || resourceReport.getRunnableResources(str) == null) {
                Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            } else if (resourceReport.getRunnableResources(str) == null || resourceReport.getRunnableResources(str).size() != i) {
                Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            } else {
                if (map == null) {
                    LOG.info("Return resource report without comparing container ids.");
                    return resourceReport;
                }
                boolean z = false;
                Iterator it = resourceReport.getRunnableResources(str).iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    TwillRunResources twillRunResources = (TwillRunResources) it.next();
                    int instanceId = twillRunResources.getInstanceId();
                    if (twillRunResources.getContainerId().equals(map.get(Integer.valueOf(instanceId)))) {
                        LOG.warn("Found an instance id {} with same container id {} for restart all, let's wait for a while.", Integer.valueOf(instanceId), twillRunResources.getContainerId());
                        z = true;
                        break;
                    }
                }
                if (!z) {
                    LOG.info("Get set of different container ids for restart.");
                    return resourceReport;
                }
                Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            }
        } while (stopwatch.elapsedTime(timeUnit) < j);
        LOG.error("Unable to get different container ids for restart.");
        return null;
    }
}
