package org.apache.twill.yarn;

import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.Command;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.ServiceDiscovered;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/twill/yarn/ServiceDiscoveryTestRun.class */
public final class ServiceDiscoveryTestRun extends BaseYarnTest {

    /* loaded from: input_file:org/apache/twill/yarn/ServiceDiscoveryTestRun$ServiceApplication.class */
    public static final class ServiceApplication implements TwillApplication {
        public TwillSpecification configure() {
            return TwillSpecification.Builder.with().setName("ServiceApp").withRunnable().add("r1", new ServiceRunnable()).noLocalFiles().add("r2", new ServiceRunnable()).noLocalFiles().anyOrder().build();
        }
    }

    /* loaded from: input_file:org/apache/twill/yarn/ServiceDiscoveryTestRun$ServiceRunnable.class */
    public static final class ServiceRunnable extends AbstractTwillRunnable {
        private static final Logger LOG = LoggerFactory.getLogger(ServiceRunnable.class);
        private static final String SERVICE_NAME = "service";
        private final CountDownLatch stopLatch = new CountDownLatch(1);

        public void run() {
            final int parseInt = Integer.parseInt(getContext().getArguments()[0]);
            Cancellable announce = getContext().announce(SERVICE_NAME, parseInt);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            getContext().discover(SERVICE_NAME).watchChanges(new ServiceDiscovered.ChangeListener() { // from class: org.apache.twill.yarn.ServiceDiscoveryTestRun.ServiceRunnable.1
                public void onChange(ServiceDiscovered serviceDiscovered) {
                    Iterator it = serviceDiscovered.iterator();
                    while (it.hasNext()) {
                        Discoverable discoverable = (Discoverable) it.next();
                        int port = discoverable.getSocketAddress().getPort();
                        if (ServiceRunnable.SERVICE_NAME.equals(discoverable.getName()) && port != parseInt) {
                            ServiceRunnable.LOG.info("{}: Service discovered at {}", ServiceRunnable.this.getContext().getSpecification().getName(), Integer.valueOf(port));
                            countDownLatch.countDown();
                        }
                    }
                }
            }, Threads.SAME_THREAD_EXECUTOR);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                LOG.warn("Interrupted.", e);
            }
            Cancellable announce2 = getContext().announce("completed", parseInt);
            try {
                this.stopLatch.await();
                announce.cancel();
                announce2.cancel();
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted.", e2);
            }
        }

        public void handleCommand(Command command) throws Exception {
            if ("done".equals(command.getCommand())) {
                this.stopLatch.countDown();
            }
        }
    }

    @Test
    public void testServiceDiscovery() throws InterruptedException, ExecutionException, TimeoutException {
        TwillController start = getTwillRunner().prepare(new ServiceApplication()).addLogHandler(new PrinterLogHandler(new PrintWriter((OutputStream) System.out, true))).withArguments("r1", new String[]{"12345"}).withArguments("r2", new String[]{"45678"}).start();
        Assert.assertTrue(waitForSize(start.discoverService("completed"), 2, 120));
        start.sendCommand(Command.Builder.of("done").build());
        start.awaitTerminated(120L, TimeUnit.SECONDS);
    }
}
