package org.apache.twill.yarn;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.Command;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/twill/yarn/RestartRunnableTestRun.class */
public class RestartRunnableTestRun extends BaseYarnTest {
    private static final Logger LOG = LoggerFactory.getLogger(RestartRunnableTestRun.class);
    private static final String HANGING_RUNNABLE = HangingRunnable.class.getSimpleName();
    private static final String STOPPING_RUNNABLE = StoppingRunnable.class.getSimpleName();
    private static final String HANGING_RUNNABLE_STOP_SECS = "hanging.runnable.stop.secs";

    /* loaded from: input_file:org/apache/twill/yarn/RestartRunnableTestRun$HangingRunnable.class */
    public static final class HangingRunnable extends AbstractTwillRunnable {
        private volatile Thread runThread;
        private final AtomicInteger sleepTime = new AtomicInteger(1);

        public void run() {
            this.runThread = Thread.currentThread();
            RestartRunnableTestRun.LOG.info("Starting Runnable {}", RestartRunnableTestRun.HANGING_RUNNABLE);
            while (!Thread.interrupted()) {
                try {
                    TimeUnit.MILLISECONDS.sleep(200L);
                } catch (InterruptedException e) {
                }
            }
            RestartRunnableTestRun.LOG.info("Stopping Runnable {}", RestartRunnableTestRun.HANGING_RUNNABLE);
        }

        public void stop() {
            RestartRunnableTestRun.LOG.info("Using sleep time = {} secs", this.sleepTime);
            try {
                TimeUnit.SECONDS.sleep(this.sleepTime.get());
            } catch (InterruptedException e) {
                RestartRunnableTestRun.LOG.error("Got exception: ", e);
            }
            if (this.runThread != null) {
                this.runThread.interrupt();
            }
        }

        public void handleCommand(Command command) throws Exception {
            super.handleCommand(command);
            if (RestartRunnableTestRun.HANGING_RUNNABLE_STOP_SECS.equals(command.getCommand())) {
                int parseInt = Integer.parseInt((String) command.getOptions().get(RestartRunnableTestRun.HANGING_RUNNABLE_STOP_SECS));
                RestartRunnableTestRun.LOG.info("Got sleep time from message = {} secs", Integer.valueOf(parseInt));
                this.sleepTime.set(parseInt);
            }
        }
    }

    /* loaded from: input_file:org/apache/twill/yarn/RestartRunnableTestRun$RestartTestApplication.class */
    public static final class RestartTestApplication implements TwillApplication {
        public TwillSpecification configure() {
            return TwillSpecification.Builder.with().setName(RestartTestApplication.class.getSimpleName()).withRunnable().add(RestartRunnableTestRun.HANGING_RUNNABLE, new HangingRunnable()).noLocalFiles().add(RestartRunnableTestRun.STOPPING_RUNNABLE, new StoppingRunnable()).noLocalFiles().withOrder().begin(RestartRunnableTestRun.HANGING_RUNNABLE, new String[0]).nextWhenStarted(RestartRunnableTestRun.STOPPING_RUNNABLE, new String[0]).build();
        }
    }

    /* loaded from: input_file:org/apache/twill/yarn/RestartRunnableTestRun$SingleRunnableApp.class */
    public static final class SingleRunnableApp implements TwillApplication {
        public TwillSpecification configure() {
            return TwillSpecification.Builder.with().setName(RestartTestApplication.class.getSimpleName()).withRunnable().add(RestartRunnableTestRun.HANGING_RUNNABLE, new HangingRunnable()).noLocalFiles().anyOrder().build();
        }
    }

    /* loaded from: input_file:org/apache/twill/yarn/RestartRunnableTestRun$SleepCommand.class */
    private static class SleepCommand implements Command {
        private final int sleepTime;

        public SleepCommand(int i) {
            this.sleepTime = i;
        }

        public String getCommand() {
            return RestartRunnableTestRun.HANGING_RUNNABLE_STOP_SECS;
        }

        public Map<String, String> getOptions() {
            return ImmutableMap.of(RestartRunnableTestRun.HANGING_RUNNABLE_STOP_SECS, Integer.toString(this.sleepTime));
        }
    }

    /* loaded from: input_file:org/apache/twill/yarn/RestartRunnableTestRun$StoppingRunnable.class */
    public static final class StoppingRunnable extends AbstractTwillRunnable {
        private volatile Thread runThread;

        public void run() {
            this.runThread = Thread.currentThread();
            RestartRunnableTestRun.LOG.info("Starting Runnable {}", RestartRunnableTestRun.STOPPING_RUNNABLE);
            while (!Thread.interrupted()) {
                try {
                    TimeUnit.MILLISECONDS.sleep(200L);
                } catch (InterruptedException e) {
                }
            }
            RestartRunnableTestRun.LOG.info("Stopping Runnable {}", RestartRunnableTestRun.STOPPING_RUNNABLE);
        }

        public void stop() {
            if (this.runThread != null) {
                this.runThread.interrupt();
            }
        }
    }

    @Test
    public void testRestartSingleRunnable() throws Exception {
        YarnTwillRunnerService twillRunner = getTwillRunner();
        twillRunner.start();
        LOG.info("Starting application {}", SingleRunnableApp.class.getSimpleName());
        TwillController start = twillRunner.prepare(new SingleRunnableApp()).addLogHandler(new PrinterLogHandler(new PrintWriter(System.out))).start();
        waitForInstance(start, HANGING_RUNNABLE, "002", 120L, TimeUnit.SECONDS);
        waitForContainers(start, 2, 60L, TimeUnit.SECONDS);
        LOG.info("Restarting runnable {}", HANGING_RUNNABLE);
        start.restartAllInstances(HANGING_RUNNABLE);
        waitForInstance(start, HANGING_RUNNABLE, "003", 120L, TimeUnit.SECONDS);
        waitForContainers(start, 2, 60L, TimeUnit.SECONDS);
        start.sendCommand(HANGING_RUNNABLE, new SleepCommand(1000)).get();
        LOG.info("Restarting runnable {}", HANGING_RUNNABLE);
        start.restartAllInstances(HANGING_RUNNABLE);
        waitForInstance(start, HANGING_RUNNABLE, "004", 120L, TimeUnit.SECONDS);
        waitForContainers(start, 2, 60L, TimeUnit.SECONDS);
    }

    @Test
    public void testRestartRunnable() throws Exception {
        YarnTwillRunnerService twillRunner = getTwillRunner();
        twillRunner.start();
        LOG.info("Starting application {}", RestartTestApplication.class.getSimpleName());
        TwillController start = twillRunner.prepare(new RestartTestApplication()).addLogHandler(new PrinterLogHandler(new PrintWriter(System.out))).start();
        waitForInstance(start, HANGING_RUNNABLE, "002", 120L, TimeUnit.SECONDS);
        waitForInstance(start, STOPPING_RUNNABLE, "003", 120L, TimeUnit.SECONDS);
        waitForContainers(start, 3, 60L, TimeUnit.SECONDS);
        start.sendCommand(HANGING_RUNNABLE, new SleepCommand(1000)).get();
        LOG.info("Increasing instances of both runnables");
        allAsList(start.changeInstances(HANGING_RUNNABLE, 3), start.changeInstances(STOPPING_RUNNABLE, 2)).get(120L, TimeUnit.SECONDS);
        waitForInstance(start, HANGING_RUNNABLE, "005", 120L, TimeUnit.SECONDS);
        waitForInstance(start, STOPPING_RUNNABLE, "006", 120L, TimeUnit.SECONDS);
        waitForContainers(start, 6, 60L, TimeUnit.SECONDS);
        LOG.info("Restarting all instances of runnables {} and {}", HANGING_RUNNABLE, STOPPING_RUNNABLE);
        allAsList(start.restartAllInstances(HANGING_RUNNABLE), start.restartAllInstances(STOPPING_RUNNABLE)).get(120L, TimeUnit.SECONDS);
        waitForInstance(start, HANGING_RUNNABLE, "009", 120L, TimeUnit.SECONDS);
        waitForInstance(start, STOPPING_RUNNABLE, "011", 120L, TimeUnit.SECONDS);
        waitForContainers(start, 6, 60L, TimeUnit.SECONDS);
        LOG.info("Restarting a single runnable of both");
        allAsList(start.restartInstances(HANGING_RUNNABLE, 1, new int[0]), start.restartInstances(ImmutableMap.of(STOPPING_RUNNABLE, Collections.singleton(0)))).get(120L, TimeUnit.SECONDS);
        waitForInstance(start, HANGING_RUNNABLE, "012", 120L, TimeUnit.SECONDS);
        waitForInstance(start, STOPPING_RUNNABLE, "013", 120L, TimeUnit.SECONDS);
        waitForContainers(start, 6, 60L, TimeUnit.SECONDS);
        start.sendCommand(HANGING_RUNNABLE, new SleepCommand(10)).get();
        LOG.info("Decreasing instances of both runnables");
        allAsList(start.changeInstances(HANGING_RUNNABLE, 1), start.changeInstances(STOPPING_RUNNABLE, 1)).get(120L, TimeUnit.SECONDS);
        waitForInstance(start, HANGING_RUNNABLE, "007", 120L, TimeUnit.SECONDS);
        waitForInstance(start, STOPPING_RUNNABLE, "013", 120L, TimeUnit.SECONDS);
        waitForContainers(start, 3, 60L, TimeUnit.SECONDS);
        LOG.info("Stopping application {}", RestartTestApplication.class.getSimpleName());
        start.terminate().get(120L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(2L);
    }

    private void waitForContainers(TwillController twillController, int i, long j, TimeUnit timeUnit) throws Exception {
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.start();
        int i2 = 0;
        int i3 = 0;
        do {
            if (twillController.getResourceReport() != null) {
                i2 = getApplicationResourceReport(twillController.getResourceReport().getApplicationId()).getNumUsedContainers();
                i3 = getTwillContainersUsed(twillController);
                if (i2 == i && i3 == i) {
                    return;
                }
            }
            TimeUnit.SECONDS.sleep(1L);
        } while (stopwatch.elapsedTime(timeUnit) < j);
        throw new TimeoutException("Timeout reached while waiting for num containers to be " + i + ". Yarn containers = " + i2 + ", Twill containers = " + i3);
    }

    private void waitForInstance(TwillController twillController, String str, String str2, long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.start();
        do {
            ResourceReport resourceReport = twillController.getResourceReport();
            if (resourceReport != null && resourceReport.getRunnableResources(str) != null) {
                Iterator it = resourceReport.getRunnableResources(str).iterator();
                while (it.hasNext()) {
                    if (((TwillRunResources) it.next()).getContainerId().endsWith(str2)) {
                        return;
                    }
                }
            }
            TimeUnit.SECONDS.sleep(1L);
        } while (stopwatch.elapsedTime(timeUnit) < j);
        throw new TimeoutException("Timeout reached while waiting for runnable " + str + " instance " + str2);
    }

    private int getTwillContainersUsed(TwillController twillController) {
        if (twillController.getResourceReport() == null) {
            return 0;
        }
        int i = 1;
        Iterator it = twillController.getResourceReport().getResources().values().iterator();
        while (it.hasNext()) {
            i += ((Collection) it.next()).size();
        }
        return i;
    }

    @SafeVarargs
    private final <V> ListenableFuture<List<V>> allAsList(Future<? extends V>... futureArr) {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (Future<? extends V> future : futureArr) {
            builder.add(JdkFutureAdapters.listenInPoolThread(future));
        }
        return Futures.allAsList(builder.build());
    }
}
