/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.dataflow.integration.test;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import net.javacrumbs.jsonunit.assertj.JsonAssertions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.dataflow.core.ApplicationType;
import org.springframework.cloud.dataflow.integration.test.DataFlowOperationsITConfiguration;
import org.springframework.cloud.dataflow.integration.test.IntegrationTestProperties;
import org.springframework.cloud.dataflow.integration.test.tags.DockerCompose;
import org.springframework.cloud.dataflow.integration.test.util.DockerComposeFactory;
import org.springframework.cloud.dataflow.integration.test.util.RuntimeApplicationHelper;
import org.springframework.cloud.dataflow.rest.client.DataFlowClientException;
import org.springframework.cloud.dataflow.rest.client.DataFlowOperations;
import org.springframework.cloud.dataflow.rest.client.DataFlowTemplate;
import org.springframework.cloud.dataflow.rest.client.dsl.DeploymentPropertiesBuilder;
import org.springframework.cloud.dataflow.rest.client.dsl.Stream;
import org.springframework.cloud.dataflow.rest.client.dsl.StreamApplication;
import org.springframework.cloud.dataflow.rest.client.dsl.StreamDefinition;
import org.springframework.cloud.dataflow.rest.client.dsl.task.Task;
import org.springframework.cloud.dataflow.rest.client.dsl.task.TaskBuilder;
import org.springframework.cloud.dataflow.rest.resource.DetailedAppRegistrationResource;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionResource;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionStatus;
import org.springframework.cloud.dataflow.rest.resource.about.AboutResource;
import org.springframework.cloud.skipper.domain.SpringCloudDeployerApplicationManifestReader;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.hateoas.PagedModel;
import org.springframework.http.HttpMethod;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.util.StreamUtils;
import org.springframework.web.util.UriComponentsBuilder;

@ExtendWith(value={SpringExtension.class})
@EnableConfigurationProperties(value={IntegrationTestProperties.class})
@TestMethodOrder(value=MethodOrderer.OrderAnnotation.class)
@Import(value={DataFlowOperationsITConfiguration.class})
@DockerCompose
public class DataFlowIT {
    private static final Logger logger = LoggerFactory.getLogger(DataFlowIT.class);
    @Autowired
    protected IntegrationTestProperties testProperties;
    @Autowired
    protected DataFlowTemplate dataFlowOperations;
    @Autowired
    protected RuntimeApplicationHelper runtimeApps;
    @TempDir
    static Path tempDockerComposeYamlFolder;
    @RegisterExtension
    public static Extension dockerCompose;
    private static final String SPRING_CLOUD_DATAFLOW_SKIPPER_PLATFORM_NAME = "spring.cloud.dataflow.skipper.platformName";
    public static final String DEPLOYED = "deployed";
    public static final String DELETED = "deleted";
    public static final String UNDEPLOYED = "undeployed";
    public static final String DEPLOYING = "deploying";
    public static final String PARTIAL = "partial";
    public static final int EXIT_CODE_SUCCESS = 0;
    public static final int EXIT_CODE_ERROR = 1;

    @BeforeEach
    public void before() {
        Awaitility.setDefaultPollInterval((Duration)Duration.ofSeconds(5L));
        Awaitility.setDefaultTimeout((Duration)Duration.ofMinutes(15L));
    }

    @AfterEach
    public void after() {
        this.dataFlowOperations.streamOperations().destroyAll();
        this.dataFlowOperations.taskOperations().destroyAll();
    }

    @Test
    @Order(value=-2147483648)
    public void aboutTestInfo() {
        logger.info("Available platforms: " + this.dataFlowOperations.streamOperations().listPlatforms().stream().map(d -> String.format("[name: %s, type: %s]", d.getName(), d.getType())).collect(Collectors.joining()));
        logger.info(String.format("Selected platform: [name: %s, type: %s]", this.runtimeApps.getPlatformName(), this.runtimeApps.getPlatformType()));
        logger.info("Wait until at least 60 apps are registered in SCDF");
        Awaitility.await().until(() -> this.dataFlowOperations.appRegistryOperations().list().getMetadata().getTotalElements() >= 60L);
    }

    @Test
    public void applicationMetadataMavenTests() {
        logger.info("application-metadata-maven-test");
        DetailedAppRegistrationResource mavenAppWithJarMetadata = this.dataFlowOperations.appRegistryOperations().info("file", ApplicationType.sink, false);
        Assertions.assertThat((List)mavenAppWithJarMetadata.getOptions()).hasSize(8);
        this.dataFlowOperations.appRegistryOperations().register("maven-app-without-metadata", ApplicationType.sink, "maven://org.springframework.cloud.stream.app:file-sink-kafka:2.1.1.RELEASE", null, true);
        DetailedAppRegistrationResource mavenAppWithoutMetadata = this.dataFlowOperations.appRegistryOperations().info("maven-app-without-metadata", ApplicationType.sink, false);
        Assertions.assertThat((List)mavenAppWithoutMetadata.getOptions()).hasSize(8);
        this.dataFlowOperations.appRegistryOperations().unregister("maven-app-without-metadata", ApplicationType.sink);
    }

    @Test
    @DisabledIfSystemProperty(named="PLATFORM_TYPE", matches="cloudfoundry")
    public void applicationMetadataDockerTests() {
        logger.info("application-metadata-docker-test");
        this.dataFlowOperations.appRegistryOperations().register("docker-app-with-container-metadata", ApplicationType.source, "docker:springcloudstream/time-source-kafka:2.1.4.RELEASE", null, true);
        DetailedAppRegistrationResource dockerAppWithContainerMetadata = this.dataFlowOperations.appRegistryOperations().info("docker-app-with-container-metadata", ApplicationType.source, false);
        Assertions.assertThat((List)dockerAppWithContainerMetadata.getOptions()).hasSize(6);
        this.dataFlowOperations.appRegistryOperations().register("docker-app-with-container-metadata-escape-chars", ApplicationType.source, "docker:springcloudstream/http-source-rabbit:2.1.3.RELEASE", null, true);
        DetailedAppRegistrationResource dockerAppWithContainerMetadataWithEscapeChars = this.dataFlowOperations.appRegistryOperations().info("docker-app-with-container-metadata-escape-chars", ApplicationType.source, false);
        Assertions.assertThat((List)dockerAppWithContainerMetadataWithEscapeChars.getOptions()).hasSize(6);
        this.dataFlowOperations.appRegistryOperations().register("docker-app-without-metadata", ApplicationType.sink, "docker:springcloudstream/file-sink-kafka:2.1.1.RELEASE", null, true);
        DetailedAppRegistrationResource dockerAppWithoutMetadata = this.dataFlowOperations.appRegistryOperations().info("docker-app-without-metadata", ApplicationType.sink, false);
        Assertions.assertThat((List)dockerAppWithoutMetadata.getOptions()).hasSize(0);
        this.dataFlowOperations.appRegistryOperations().register("docker-app-with-jar-metadata", ApplicationType.sink, "docker:springcloudstream/file-sink-kafka:2.1.1.RELEASE", "maven://org.springframework.cloud.stream.app:file-sink-kafka:jar:metadata:2.1.1.RELEASE", true);
        DetailedAppRegistrationResource dockerAppWithJarMetadata = this.dataFlowOperations.appRegistryOperations().info("docker-app-with-jar-metadata", ApplicationType.sink, false);
        Assertions.assertThat((List)dockerAppWithJarMetadata.getOptions()).hasSize(8);
        this.dataFlowOperations.appRegistryOperations().unregister("docker-app-with-container-metadata", ApplicationType.source);
        this.dataFlowOperations.appRegistryOperations().unregister("docker-app-with-container-metadata-escape-chars", ApplicationType.source);
        this.dataFlowOperations.appRegistryOperations().unregister("docker-app-without-metadata", ApplicationType.sink);
        this.dataFlowOperations.appRegistryOperations().unregister("docker-app-with-jar-metadata", ApplicationType.sink);
    }

    @Test
    @EnabledIfEnvironmentVariable(named="SCDF_CR_TEST", matches="true")
    public void githubContainerRegistryTests() {
        this.containerRegistryTests("github-log-sink", "docker:ghcr.io/tzolov/log-sink-rabbit:3.1.0-SNAPSHOT");
    }

    @Test
    @EnabledIfEnvironmentVariable(named="SCDF_CR_TEST", matches="true")
    public void azureContainerRegistryTests() {
        this.containerRegistryTests("azure-log-sink", "docker:scdftest.azurecr.io/springcloudstream/log-sink-rabbit:3.1.0-SNAPSHOT");
    }

    @Test
    @EnabledIfEnvironmentVariable(named="SCDF_CR_TEST", matches="true")
    public void harborContainerRegistryTests() {
        this.containerRegistryTests("harbor-log-sink", "docker:projects.registry.vmware.com/scdf/scdftest/log-sink-rabbit:3.1.0-SNAPSHOT");
    }

    private void containerRegistryTests(String appName, String appUrl) {
        logger.info("application-metadata-" + appName + "-container-registry-test");
        this.dataFlowOperations.appRegistryOperations().register(appName, ApplicationType.sink, appUrl, null, true);
        DetailedAppRegistrationResource dockerAppWithContainerMetadata = this.dataFlowOperations.appRegistryOperations().info(appName, ApplicationType.sink, false);
        Assertions.assertThat((List)dockerAppWithContainerMetadata.getOptions()).hasSize(3);
        this.dataFlowOperations.appRegistryOperations().unregister(appName, ApplicationType.sink);
    }

    @Test
    public void featureInfo() {
        logger.info("platform-feature-info-test");
        AboutResource about = this.dataFlowOperations.aboutOperation().get();
        Assertions.assertThat((boolean)about.getFeatureInfo().isAnalyticsEnabled()).isTrue();
        Assertions.assertThat((boolean)about.getFeatureInfo().isStreamsEnabled()).isTrue();
        Assertions.assertThat((boolean)about.getFeatureInfo().isTasksEnabled()).isTrue();
    }

    @Test
    public void appsCount() {
        logger.info("platform-apps-count-test");
        Assertions.assertThat((long)this.dataFlowOperations.appRegistryOperations().list().getMetadata().getTotalElements()).isGreaterThanOrEqualTo(60L);
    }

    @Test
    public void streamTransform() {
        logger.info("stream-transform-test");
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("transform-test").definition("http | transform --expression=payload.toUpperCase() | log").create().deploy(this.testDeploymentProperties("http"));){
            Assertions.assertThat((String)stream.getStatus()).is(DataFlowIT.condition(status -> status.equals(DEPLOYING) || status.equals(PARTIAL)));
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            String message = "Unique Test message: " + new Random().nextInt();
            this.runtimeApps.httpPost(stream.getName(), "http", message);
            Awaitility.await().until(() -> stream.logs(this.app("log")).contains(message.toUpperCase()));
        }
    }

    @Test
    public void streamPartitioning() {
        logger.info("stream-partitioning-test (aka. WoodChuckTests)");
        StreamDefinition streamDefinition = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("partitioning-test").definition("http | splitter --expression=payload.split(' ') | log").create();
        try (Stream stream = streamDefinition.deploy(new DeploymentPropertiesBuilder().putAll(this.testDeploymentProperties("http", "log")).put(SPRING_CLOUD_DATAFLOW_SKIPPER_PLATFORM_NAME, this.runtimeApps.getPlatformName()).put("deployer.log.count", "2").put("app.splitter.producer.partitionKeyExpression", "payload").put("app.log.spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled", "false").put("app.log.logging.pattern.level", "WOODCHUCK-${INSTANCE_INDEX:${CF_INSTANCE_INDEX:${spring.cloud.stream.instanceIndex:666}}} %5p").build());){
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            String message = "How much wood would a woodchuck chuck if a woodchuck could chuck wood";
            this.runtimeApps.httpPost(stream.getName(), "http", message);
            Awaitility.await().until(() -> {
                Collection<String> logs = this.runtimeApps.applicationInstanceLogs(stream.getName(), "log").values();
                return logs.size() == 2 && logs.stream().map(log -> log.contains("WOODCHUCK-0") ? DataFlowIT.asList("WOODCHUCK-0", "How", "chuck").stream().allMatch(log::contains) : DataFlowIT.asList("WOODCHUCK-1", "much", "wood", "would", "if", "a", "woodchuck", "could").stream().allMatch(log::contains)).reduce(Boolean::logicalAnd).orElse(false) != false;
            });
        }
    }

    @Test
    @Order(value=-2147483638)
    public void streamAppCrossVersion() {
        String VERSION_2_1_5 = "2.1.5.RELEASE";
        String VERSION_3_0_1 = "3.0.1";
        Assumptions.assumeTrue((!this.runtimeApps.getPlatformType().equals("cloudfoundry") || this.runtimeApps.dataflowServerVersionEqualOrGreaterThan("2.7.0") ? 1 : 0) != 0, (String)"stream-app-cross-version-test: SKIP - CloudFoundry 2.6 and below!");
        Assumptions.assumeTrue((this.runtimeApps.isAppRegistered("ver-log", ApplicationType.sink, "3.0.1") && this.runtimeApps.isAppRegistered("ver-log", ApplicationType.sink, "2.1.5.RELEASE") ? 1 : 0) != 0, (String)"stream-app-cross-version-test: SKIP - required ver-log apps not registered!");
        logger.info("stream-app-cross-version-test: DEPLOY");
        int CURRENT_MANIFEST = 0;
        String RANDOM_SUFFIX = DataFlowIT.randomSuffix();
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("app-cross-version-test" + RANDOM_SUFFIX).definition("http | ver-log").create().deploy(new DeploymentPropertiesBuilder().putAll(this.testDeploymentProperties("http")).put("version.ver-log", "3.0.1").build());){
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            Supplier<String> currentVerLogVersion = () -> new SpringCloudDeployerApplicationManifestReader().read(stream.manifest(CURRENT_MANIFEST)).stream().filter(m -> ((String)m.getMetadata().get("name")).equals("ver-log")).map(m -> m.getSpec().getVersion()).findFirst().orElse("none");
            Consumer<String> awaitSendAndReceiveTestMessage = message -> Awaitility.await().until(() -> {
                this.runtimeApps.httpPost(stream.getName(), "http", (String)message);
                return stream.logs(this.app("ver-log")).contains((CharSequence)message);
            });
            awaitSendAndReceiveTestMessage.accept(String.format("TEST MESSAGE 1-%s ", RANDOM_SUFFIX));
            Assertions.assertThat((String)currentVerLogVersion.get()).isEqualTo("3.0.1");
            Assertions.assertThat((int)stream.history().size()).isEqualTo(1L);
            logger.info("stream-app-cross-version-test: UPDATE");
            stream.update(new DeploymentPropertiesBuilder().put("version.ver-log", "2.1.5.RELEASE").build());
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            awaitSendAndReceiveTestMessage.accept(String.format("TEST MESSAGE 2-%s ", RANDOM_SUFFIX));
            Assertions.assertThat((String)currentVerLogVersion.get()).isEqualTo("2.1.5.RELEASE");
            Assertions.assertThat((int)stream.history().size()).isEqualTo(2);
            logger.info("stream-app-cross-version-test: ROLLBACK");
            stream.rollback(0);
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            awaitSendAndReceiveTestMessage.accept(String.format("TEST MESSAGE 3-%s ", RANDOM_SUFFIX));
            Assertions.assertThat((String)currentVerLogVersion.get()).isEqualTo("3.0.1");
            Assertions.assertThat((int)stream.history().size()).isEqualTo(3);
        }
        logger.info("stream-app-cross-version-test: DESTROY");
        Assertions.assertThat((long)Optional.ofNullable(this.dataFlowOperations.streamOperations().list().getMetadata()).orElse(new PagedModel.PageMetadata(0L, 0L, 0L)).getTotalElements()).isEqualTo(0L);
    }

    @Test
    public void streamLifecycle() {
        this.streamLifecycleHelper(1, s -> {});
    }

    @Test
    public void streamLifecycleWithTwoInstance() {
        int numberOfInstancePerApp = 2;
        this.streamLifecycleHelper(2, stream -> {
            Map streamApps = stream.runtimeApps();
            Assertions.assertThat((int)streamApps.size()).isEqualTo(2);
            for (Map instanceMap : streamApps.values()) {
                Assertions.assertThat((int)instanceMap.size()).isEqualTo(2);
            }
        });
    }

    private void streamLifecycleHelper(int appInstanceCount, Consumer<Stream> streamAssertions) {
        logger.info("stream-lifecycle-test: DEPLOY");
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("lifecycle-test" + DataFlowIT.randomSuffix()).definition("time | log --log.name='TEST' --log.expression='TICKTOCK - TIMESTAMP: '.concat(payload)").create().deploy(new DeploymentPropertiesBuilder().putAll(this.testDeploymentProperties(new String[0])).put("deployer.*.count", "" + appInstanceCount).build());){
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            streamAssertions.accept(stream);
            Awaitility.await().until(() -> stream.logs(this.app("log")).contains("TICKTOCK - TIMESTAMP:"));
            Assertions.assertThat((int)stream.history().size()).isEqualTo(1L);
            Awaitility.await().until(() -> ((String)stream.history().get(1)).equals(DEPLOYED));
            Assertions.assertThat((String)stream.logs()).contains(new CharSequence[]{"TICKTOCK - TIMESTAMP:"});
            Assertions.assertThat((String)stream.logs(this.app("log"))).contains(new CharSequence[]{"TICKTOCK - TIMESTAMP:"});
            logger.info("stream-lifecycle-test: UPDATE");
            stream.update(new DeploymentPropertiesBuilder().put("app.log.log.expression", "'Updated TICKTOCK - TIMESTAMP: '.concat(payload)").put("app.*.management.endpoints.web.exposure.include", "*").build());
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            streamAssertions.accept(stream);
            Awaitility.await().until(() -> stream.logs(this.app("log")).contains("Updated TICKTOCK - TIMESTAMP:"));
            Assertions.assertThat((int)stream.history().size()).isEqualTo(2);
            Awaitility.await().until(() -> ((String)stream.history().get(1)).equals(DELETED));
            Awaitility.await().until(() -> ((String)stream.history().get(2)).equals(DEPLOYED));
            logger.info("stream-lifecycle-test: ROLLBACK");
            stream.rollback(0);
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            streamAssertions.accept(stream);
            Awaitility.await().until(() -> stream.logs(this.app("log")).contains("TICKTOCK - TIMESTAMP:"));
            Assertions.assertThat((int)stream.history().size()).isEqualTo(3);
            Awaitility.await().until(() -> ((String)stream.history().get(1)).equals(DELETED));
            Awaitility.await().until(() -> ((String)stream.history().get(2)).equals(DELETED));
            Awaitility.await().until(() -> ((String)stream.history().get(3)).equals(DEPLOYED));
            logger.info("stream-lifecycle-test: UNDEPLOY");
            stream.undeploy();
            Awaitility.await().until(() -> stream.getStatus().equals(UNDEPLOYED));
            Assertions.assertThat((int)stream.history().size()).isEqualTo(3);
            Awaitility.await().until(() -> ((String)stream.history().get(1)).equals(DELETED));
            Awaitility.await().until(() -> ((String)stream.history().get(2)).equals(DELETED));
            Awaitility.await().until(() -> ((String)stream.history().get(3)).equals(DELETED));
            Assertions.assertThat((long)this.dataFlowOperations.streamOperations().list().getMetadata().getTotalElements()).isEqualTo(1L);
        }
        logger.info("stream-lifecycle-test: DESTROY");
        Assertions.assertThat((long)this.dataFlowOperations.streamOperations().list().getMetadata().getTotalElements()).isEqualTo(0L);
    }

    @Test
    public void streamScaling() {
        logger.info("stream-scaling-test");
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("stream-scaling-test").definition("time | log --log.expression='TICKTOCK - TIMESTAMP: '.concat(payload)").create().deploy(this.testDeploymentProperties(new String[0]));){
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            StreamApplication time = this.app("time");
            StreamApplication log = this.app("log");
            Map streamApps = stream.runtimeApps();
            Assertions.assertThat((int)streamApps.size()).isEqualTo(2);
            Assertions.assertThat((int)((Map)streamApps.get(time)).size()).isEqualTo(1);
            Assertions.assertThat((int)((Map)streamApps.get(log)).size()).isEqualTo(1);
            stream.scaleApplicationInstances(log, 2, Collections.emptyMap());
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            Awaitility.await().until(() -> ((Map)stream.runtimeApps().get(log)).size() == 2);
            Assertions.assertThat((String)stream.getStatus()).isEqualTo(DEPLOYED);
            streamApps = stream.runtimeApps();
            Assertions.assertThat((int)streamApps.size()).isEqualTo(2);
            Assertions.assertThat((int)((Map)streamApps.get(time)).size()).isEqualTo(1);
            Assertions.assertThat((int)((Map)streamApps.get(log)).size()).isEqualTo(2);
        }
    }

    @Test
    public void namedChannelDestination() {
        logger.info("stream-named-channel-destination-test");
        try (Stream logStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("log-destination-sink").definition(":LOG-DESTINATION > log").create().deploy(this.testDeploymentProperties(new String[0]));
             Stream httpStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("http-destination-source").definition("http > :LOG-DESTINATION").create().deploy(this.testDeploymentProperties("http"));){
            Awaitility.await().until(() -> logStream.getStatus().equals(DEPLOYED));
            Awaitility.await().until(() -> httpStream.getStatus().equals(DEPLOYED));
            String message = "Unique Test message: " + new Random().nextInt();
            this.runtimeApps.httpPost(httpStream.getName(), "http", message);
            Awaitility.await().until(() -> logStream.logs(this.app("log")).contains(message));
        }
    }

    @Test
    public void namedChannelTap() {
        logger.info("stream-named-channel-tap-test");
        try (Stream httpLogStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("taphttp").definition("http | log").create().deploy(this.testDeploymentProperties("http"));
             Stream tapStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("tapstream").definition(":taphttp.http > log").create().deploy(this.testDeploymentProperties(new String[0]));){
            Awaitility.await().until(() -> httpLogStream.getStatus().equals(DEPLOYED));
            Awaitility.await().until(() -> tapStream.getStatus().equals(DEPLOYED));
            String message = "Unique Test message: " + new Random().nextInt();
            this.runtimeApps.httpPost(httpLogStream.getName(), "http", message);
            Awaitility.await().until(() -> tapStream.logs(this.app("log")).contains(message));
        }
    }

    @Test
    public void namedChannelManyToOne() {
        logger.info("stream-named-channel-many-to-one-test");
        try (Stream logStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("many-to-one").definition(":MANY-TO-ONE-DESTINATION > log").create().deploy(this.testDeploymentProperties(new String[0]));
             Stream httpStreamOne = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("http-source-1").definition("http > :MANY-TO-ONE-DESTINATION").create().deploy(this.testDeploymentProperties("http"));
             Stream httpStreamTwo = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("http-source-2").definition("http > :MANY-TO-ONE-DESTINATION").create().deploy(this.testDeploymentProperties("http"));){
            Awaitility.await().until(() -> logStream.getStatus().equals(DEPLOYED));
            Awaitility.await().until(() -> httpStreamOne.getStatus().equals(DEPLOYED));
            Awaitility.await().until(() -> httpStreamTwo.getStatus().equals(DEPLOYED));
            String messageOne = "Unique Test message: " + new Random().nextInt();
            this.runtimeApps.httpPost(httpStreamOne.getName(), "http", messageOne);
            Awaitility.await().until(() -> logStream.logs(this.app("log")).contains(messageOne));
            String messageTwo = "Unique Test message: " + new Random().nextInt();
            this.runtimeApps.httpPost(httpStreamTwo.getName(), "http", messageTwo);
            Awaitility.await().until(() -> logStream.logs(this.app("log")).contains(messageTwo));
        }
    }

    @Test
    public void namedChannelDirectedGraph() {
        logger.info("stream-named-channel-directed-graph-test");
        try (Stream fooLogStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("directed-graph-destination1").definition(":foo > transform --expression=payload+'-foo' | log").create().deploy(this.testDeploymentProperties(new String[0]));
             Stream barLogStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("directed-graph-destination2").definition(":bar > transform --expression=payload+'-bar' | log").create().deploy(this.testDeploymentProperties(new String[0]));
             Stream httpStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("directed-graph-http-source").definition("http | router --expression=payload.contains('a')?'foo':'bar'").create().deploy(this.testDeploymentProperties("http"));){
            Awaitility.await().until(() -> fooLogStream.getStatus().equals(DEPLOYED));
            Awaitility.await().until(() -> barLogStream.getStatus().equals(DEPLOYED));
            Awaitility.await().until(() -> httpStream.getStatus().equals(DEPLOYED));
            String httpAppUrl = this.runtimeApps.getApplicationInstanceUrl(httpStream.getName(), "http");
            this.runtimeApps.httpPost(httpAppUrl, "abcd");
            this.runtimeApps.httpPost(httpAppUrl, "defg");
            Awaitility.await().until(() -> fooLogStream.logs(this.app("log")).contains("abcd-foo"));
            Awaitility.await().until(() -> barLogStream.logs(this.app("log")).contains("defg-bar"));
        }
    }

    @Test
    public void analyticsCounterInflux() {
        if (!this.influxPresent()) {
            logger.info("stream-analytics-test: SKIP - no InfluxDB metrics configured!");
        }
        Assumptions.assumeTrue((boolean)this.influxPresent());
        if (!this.runtimeApps.isAppRegistered("analytics", ApplicationType.sink)) {
            logger.info("stream-analytics-influx-test: SKIP - no analytics app registered!");
        }
        Assumptions.assumeTrue((boolean)this.runtimeApps.isAppRegistered("analytics", ApplicationType.sink), (String)"stream-analytics-test: SKIP - no analytics app registered!");
        logger.info("stream-analytics-influx-test");
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("httpAnalyticsInflux").definition("http | analytics --analytics.name=my_http_analytics --analytics.tag.expression.msgSize=payload.length()").create().deploy(this.testDeploymentProperties("http"));){
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            String message1 = "Test message 1";
            String message2 = "Test message 2 with extension";
            String message3 = "Test message 2 with double extension";
            String httpAppUrl = this.runtimeApps.getApplicationInstanceUrl(stream.getName(), "http");
            this.runtimeApps.httpPost(httpAppUrl, message1);
            this.runtimeApps.httpPost(httpAppUrl, message2);
            this.runtimeApps.httpPost(httpAppUrl, message3);
            Awaitility.await().until(() -> !JsonPath.parse((String)this.runtimeApps.httpGet(this.testProperties.getPlatform().getConnection().getInfluxUrl() + "/query?db=myinfluxdb&q=SELECT * FROM \"my_http_analytics\"")).read("$.results[0][?(@.series)].length()", new Predicate[0]).toString().equals("[]"));
            JsonAssertions.assertThatJson((Object)this.runtimeApps.httpGet(this.testProperties.getPlatform().getConnection().getInfluxUrl() + "/query?q=SHOW DATABASES")).inPath("$.results[0].series[0].values[1][0]").isEqualTo((Object)"myinfluxdb");
            List messageLengths = java.util.stream.Stream.of(message1, message2, message3).map(s -> String.format("\"%s\"", s.length())).collect(Collectors.toList());
            String myHttpCounter = this.runtimeApps.httpGet(this.testProperties.getPlatform().getConnection().getInfluxUrl() + "/query?db=myinfluxdb&q=SELECT * FROM \"my_http_analytics\"");
            JsonAssertions.assertThatJson((Object)myHttpCounter).inPath("$.results[0].series[0].values[0][7]").isIn(messageLengths);
            JsonAssertions.assertThatJson((Object)myHttpCounter).inPath("$.results[0].series[0].values[1][7]").isIn(messageLengths);
            JsonAssertions.assertThatJson((Object)myHttpCounter).inPath("$.results[0].series[0].values[2][7]").isIn(messageLengths);
        }
    }

    @Test
    public void analyticsCounterPrometheus() throws IOException {
        if (!this.runtimeApps.isAppRegistered("analytics", ApplicationType.sink)) {
            logger.info("stream-analytics-prometheus-test: SKIP - no analytics app registered!");
        }
        Assumptions.assumeTrue((boolean)this.runtimeApps.isAppRegistered("analytics", ApplicationType.sink), (String)"stream-analytics-test: SKIP - no analytics app registered!");
        if (!this.prometheusPresent()) {
            logger.info("stream-analytics-prometheus-test: SKIP - no Prometheus configured!");
        }
        Assumptions.assumeTrue((boolean)this.prometheusPresent());
        logger.info("stream-analytics-prometheus-test");
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("httpAnalyticsPrometheus").definition("http | analytics --analytics.name=my_http_analytics --analytics.tag.expression.msgSize=payload.length()").create().deploy(this.testDeploymentProperties("http"));){
            Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED));
            String message1 = "Test message 1";
            String message2 = "Test message 2 with extension";
            String message3 = "Test message 2 with double extension";
            String httpAppUrl = this.runtimeApps.getApplicationInstanceUrl(stream.getName(), "http");
            this.runtimeApps.httpPost(httpAppUrl, message1);
            this.runtimeApps.httpPost(httpAppUrl, message2);
            this.runtimeApps.httpPost(httpAppUrl, message3);
            Awaitility.await().until(() -> (Integer)JsonPath.parse((String)this.runtimeApps.httpGet(this.testProperties.getPlatform().getConnection().getPrometheusUrl() + "/api/v1/query?query=my_http_analytics_total")).read("$.data.result.length()", new Predicate[0]) > 0);
            JsonAssertions.assertThatJson((Object)this.runtimeApps.httpGet(this.testProperties.getPlatform().getConnection().getPrometheusUrl() + "/api/v1/query?query=my_http_analytics_total")).isEqualTo((Object)DataFlowIT.resourceToString("classpath:/my_http_analytics_total.json"));
        }
    }

    protected Map<String, String> testDeploymentProperties(String ... externallyAccessibleApps) {
        DeploymentPropertiesBuilder propertiesBuilder = new DeploymentPropertiesBuilder().put(SPRING_CLOUD_DATAFLOW_SKIPPER_PLATFORM_NAME, this.runtimeApps.getPlatformName()).put("app.*.logging.file", "/tmp/${PID}-test.log").put("app.*.logging.file.name", "/tmp/${PID}-test.log").put("app.*.endpoints.logfile.sensitive", "false").put("app.*.endpoints.logfile.enabled", "true").put("app.*.management.endpoints.web.exposure.include", "*").put("app.*.spring.cloud.streamapp.security.enabled", "false");
        if (this.runtimeApps.getPlatformType().equalsIgnoreCase("kubernetes")) {
            propertiesBuilder.put("app.*.server.port", "8080");
            for (String appName : externallyAccessibleApps) {
                propertiesBuilder.put("deployer." + appName + ".kubernetes.createLoadBalancer", "true");
            }
        }
        return propertiesBuilder.build();
    }

    public static String resourceToString(String resourcePath) throws IOException {
        return StreamUtils.copyToString((InputStream)new DefaultResourceLoader().getResource(resourcePath).getInputStream(), (Charset)StandardCharsets.UTF_8);
    }

    protected boolean prometheusPresent() {
        return this.runtimeApps.isServicePresent(this.testProperties.getPlatform().getConnection().getPrometheusUrl() + "/api/v1/query?query=up");
    }

    protected boolean influxPresent() {
        return this.runtimeApps.isServicePresent(this.testProperties.getPlatform().getConnection().getInfluxUrl() + "/ping");
    }

    public static Condition<String> condition(java.util.function.Predicate predicate) {
        return new Condition(predicate, "", new Object[0]);
    }

    protected StreamApplication app(String appName) {
        return new StreamApplication(appName);
    }

    private List<String> composedTaskLaunchArguments(String ... additionalArguments) {
        ArrayList<String> commonTaskArguments = new ArrayList<String>();
        commonTaskArguments.addAll(Arrays.asList("--dataflow-server-use-user-access-token=true"));
        commonTaskArguments.addAll(Arrays.asList(additionalArguments));
        return commonTaskArguments;
    }

    @Test
    @EnabledIfSystemProperty(named="PLATFORM_TYPE", matches="local")
    public void runBatchRemotePartitionJobLocal() {
        logger.info("runBatchRemotePartitionJob - local");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition("batch-remote-partition").description("runBatchRemotePartitionJob - local").build();){
            long launchId = task.launch(Collections.EMPTY_MAP, this.composedTaskLaunchArguments("--platform=local"));
            Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((boolean)task.execution(launchId).isPresent()).isTrue();
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
        }
    }

    @Test
    public void timestampTask() {
        logger.info("task-timestamp-test");
        try (Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("timestamp").description("Test timestamp task").build();){
            long launchId1 = task.launch();
            Awaitility.await().until(() -> task.executionStatus(launchId1) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((boolean)task.execution(launchId1).isPresent()).isTrue();
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId1).get()).getExitCode()).isEqualTo(0);
            long launchId2 = task.launch();
            Awaitility.await().until(() -> task.executionStatus(launchId2) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((boolean)task.execution(launchId2).isPresent()).isTrue();
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId2).get()).getExitCode()).isEqualTo(0);
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
        }
    }

    @Test
    public void taskMetricsPrometheus() throws IOException {
        if (!this.prometheusPresent()) {
            logger.info("task-metrics-test: SKIP - no metrics configured!");
        }
        Assumptions.assumeTrue((boolean)this.prometheusPresent());
        logger.info("task-metrics-test: Prometheus");
        try (Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("task-demo-metrics-prometheus --task.demo.delay.fixed=0s").description("Test task metrics").build();){
            long launchId = task.launch(Arrays.asList("--spring.cloud.task.closecontext_enabled=false"));
            Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((boolean)task.execution(launchId).isPresent()).isTrue();
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            URI qplUri = UriComponentsBuilder.fromHttpUrl((String)(this.testProperties.getPlatform().getConnection().getPrometheusUrl() + String.format("/api/v1/query?query=system_cpu_usage{service=\"task-application\",application=\"%s-%s\"}", task.getTaskName(), launchId))).build().toUri();
            Supplier<String> pqlTaskMetricsQuery = () -> (String)this.dataFlowOperations.getRestTemplate().exchange(qplUri, HttpMethod.GET, null, String.class).getBody();
            Awaitility.await().until(() -> (Integer)JsonPath.parse((String)((String)pqlTaskMetricsQuery.get())).read("$.data.result.length()", new Predicate[0]) > 0);
            JsonAssertions.assertThatJson((Object)pqlTaskMetricsQuery.get()).isEqualTo((Object)DataFlowIT.resourceToString("classpath:/task_metrics_system_cpu_usage.json"));
        }
    }

    @Test
    public void composedTask() {
        logger.info("task-composed-task-runner-test");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition("a: timestamp && b:timestamp").description("Test composedTask").build();){
            Assertions.assertThat((int)task.composedTaskChildTasks().size()).isEqualTo(2);
            long launchId1 = task.launch(this.composedTaskLaunchArguments(new String[0]));
            Awaitility.await().until(() -> task.executionStatus(launchId1) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((Comparable)task.executionStatus(launchId1)).isEqualTo((Object)TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId1).get()).getExitCode()).isEqualTo(0);
            task.composedTaskChildTasks().forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId1).get()).getExitCode()).isEqualTo(0);
            });
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            long launchId2 = task.launch(this.composedTaskLaunchArguments(new String[0]));
            Awaitility.await().until(() -> task.executionStatus(launchId2) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((Comparable)task.executionStatus(launchId2)).isEqualTo((Object)TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId2).get()).getExitCode()).isEqualTo(0);
            task.composedTaskChildTasks().forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(2);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(3);
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    @Test
    public void multipleComposedTaskWithArguments() {
        logger.info("task-multiple-composed-task-with-arguments-test");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition("a: timestamp && b:timestamp").description("Test multipleComposedTaskWithArguments").build();){
            Assertions.assertThat((int)task.composedTaskChildTasks().size()).isEqualTo(2);
            long launchId1 = task.launch(this.composedTaskLaunchArguments("--increment-instance-enabled=true"));
            Awaitility.await().until(() -> task.executionStatus(launchId1) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((Comparable)task.executionStatus(launchId1)).isEqualTo((Object)TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId1).get()).getExitCode()).isEqualTo(0);
            task.composedTaskChildTasks().forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId1).get()).getExitCode()).isEqualTo(0);
            });
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            long launchId2 = task.launch(this.composedTaskLaunchArguments("--increment-instance-enabled=true"));
            Awaitility.await().until(() -> task.executionStatus(launchId2) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((Comparable)task.executionStatus(launchId2)).isEqualTo((Object)TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId2).get()).getExitCode()).isEqualTo(0);
            task.composedTaskChildTasks().forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(2);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            Assertions.assertThat((int)task.jobExecutionResources().size()).isEqualTo(2);
            Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(3);
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    @Test
    public void ctrLaunchTest() {
        logger.info("composed-task-ctrLaunch-test");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition("a: timestamp && b:timestamp").description("ctrLaunchTest").build();){
            Assertions.assertThat(task.composedTaskChildTasks().stream().map(Task::getTaskName).collect(Collectors.toList())).hasSameElementsAs(this.fullTaskNames(task, "a", "b"));
            long launchId = task.launch(this.composedTaskLaunchArguments(new String[0]));
            Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((Comparable)task.executionStatus(launchId)).isEqualTo((Object)TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            task.composedTaskChildTasks().forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).isEqualTo(0);
            });
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            List jobExecutionIds = ((TaskExecutionResource)task.executions().stream().findFirst().get()).getJobExecutionIds();
            Assertions.assertThat((int)jobExecutionIds.size()).isEqualTo(1);
            Assumptions.assumingThat((boolean)this.runtimeApps.dataflowServerVersionEqualOrGreaterThan("2.7.0"), () -> {
                Exception exception = (Exception)org.junit.jupiter.api.Assertions.assertThrows(DataFlowClientException.class, () -> this.dataFlowOperations.jobOperations().executionRestart(((Long)jobExecutionIds.get(0)).longValue()));
                org.junit.jupiter.api.Assertions.assertTrue((boolean)exception.getMessage().contains(" and state 'COMPLETED' is not restartable"));
            });
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    @Test
    public void ctrFailedGraph() {
        logger.info("composed-task-ctrFailedGraph-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ctrFailedGraph", "scenario --io.spring.fail-task=true --io.spring.launch-batch-job=false && timestamp", TaskExecutionStatus.ERROR, DataFlowIT.emptyList(), DataFlowIT.asList("scenario"), DataFlowIT.asList("timestamp"));
    }

    @Test
    public void ctrSplit() {
        logger.info("composed-task-split-test");
        this.allSuccessfulExecutions("ComposedTask Split Test", "<t1:timestamp || t2:timestamp || t3:timestamp>", "t1", "t2", "t3");
    }

    @Test
    public void ctrSequential() {
        logger.info("composed-task-sequential-test");
        this.allSuccessfulExecutions("ComposedTask Sequential Test", "t1:timestamp && t2:timestamp && t3:timestamp", "t1", "t2", "t3");
    }

    @Test
    public void ctrSequentialTransitionAndSplitWithScenarioFailed() {
        logger.info("composed-task-SequentialTransitionAndSplitWithScenarioFailed-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Sequential Transition And Split With Scenario Failed Test", "t1: timestamp && scenario --io.spring.fail-task=true --io.spring.launch-batch-job=false 'FAILED'->t3: timestamp && <t4: timestamp || t5: timestamp> && t6: timestamp", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("t1", "t3"), DataFlowIT.asList("scenario"), DataFlowIT.asList("t4", "t5", "t6"));
    }

    @Test
    public void ctrSequentialTransitionAndSplitWithScenarioOk() {
        logger.info("composed-task-SequentialTransitionAndSplitWithScenarioOk-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Sequential Transition And Split With Scenario Ok Test", "t1: timestamp && t2: scenario 'FAILED'->t3: timestamp && <t4: timestamp || t5: timestamp> && t6: timestamp", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("t1", "t2", "t4", "t5", "t6"), DataFlowIT.emptyList(), DataFlowIT.asList("t3"));
    }

    @Test
    public void ctrNestedSplit() {
        logger.info("composed-task-NestedSplit");
        this.allSuccessfulExecutions("ctrNestedSplit", "<<t1: timestamp || t2: timestamp > && t3: timestamp || t4: timestamp>", "t1", "t2", "t3", "t4");
    }

    @Test
    public void testEmbeddedFailedGraph() {
        logger.info("composed-task-EmbeddedFailedGraph-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Embedded Failed Graph Test", String.format("a: timestamp && b:scenario  --io.spring.fail-batch=true --io.spring.jobName=%s --spring.cloud.task.batch.fail-on-job-failure=true && c:timestamp", DataFlowIT.randomJobName()), TaskExecutionStatus.ERROR, DataFlowIT.asList("a"), DataFlowIT.asList("b"), DataFlowIT.asList("c"));
    }

    @Test
    public void twoSplitTest() {
        logger.info("composed-task-twoSplit-test");
        this.allSuccessfulExecutions("twoSplitTest", "<t1: timestamp ||t2: timestamp||t3: timestamp> && <t4: timestamp||t5: timestamp>", "t1", "t2", "t3", "t4", "t5");
    }

    @Test
    public void sequentialAndSplitTest() {
        logger.info("composed-task-sequentialAndSplit-test");
        this.allSuccessfulExecutions("sequentialAndSplitTest", "<t1: timestamp && <t2: timestamp || t3: timestamp || t4: timestamp> && t5: timestamp>", "t1", "t2", "t3", "t4", "t5");
    }

    @Test
    public void sequentialTransitionAndSplitFailedInvalidTest() {
        logger.info("composed-task-sequentialTransitionAndSplitFailedInvalid-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Sequential Transition And Split Failed Invalid Test", "t1: timestamp && b:scenario --io.spring.fail-task=true --io.spring.launch-batch-job=false 'FAILED' -> t2: timestamp && t3: timestamp && t4: timestamp && <t5:timestamp || t6: timestamp> && t7: timestamp", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("t1", "t2"), DataFlowIT.asList("b"), DataFlowIT.asList("t3", "t4", "t5", "t6", "t7"));
    }

    @Test
    public void sequentialAndSplitWithFlowTest() {
        logger.info("composed-task-sequentialAndSplitWithFlow-test");
        this.allSuccessfulExecutions("sequentialAndSplitWithFlowTest", "t1: timestamp && <t2: timestamp && t3: timestamp || t4: timestamp ||t5: timestamp> && t6: timestamp", "t1", "t2", "t3", "t4", "t5", "t6");
    }

    @Test
    public void sequentialAndFailedSplitTest() {
        logger.info("composed-task-sequentialAndFailedSplit-test");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition(String.format("t1: timestamp && <t2: timestamp ||b:scenario --io.spring.fail-batch=true --io.spring.jobName=%s --spring.cloud.task.batch.fail-on-job-failure=true || t3: timestamp> && t4: timestamp", DataFlowIT.randomJobName())).description("sequentialAndFailedSplitTest").build();){
            Assertions.assertThat((int)task.composedTaskChildTasks().size()).isEqualTo(5);
            Assertions.assertThat(task.composedTaskChildTasks().stream().map(Task::getTaskName).collect(Collectors.toList())).hasSameElementsAs(this.fullTaskNames(task, "b", "t1", "t2", "t3", "t4"));
            long launchId = task.launch(this.composedTaskLaunchArguments(new String[0]));
            if (this.runtimeApps.dataflowServerVersionLowerThan("2.8.0-SNAPSHOT")) {
                Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            } else {
                Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.ERROR);
            }
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            this.childTasksBySuffix(task, "t1", "t2", "t3").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).isEqualTo(0);
            });
            this.childTasksBySuffix(task, "b").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).isEqualTo(1);
            });
            this.childTasksBySuffix(task, "t4").forEach(childTask -> Assertions.assertThat((int)childTask.executions().size()).isEqualTo(0));
            Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(task.composedTaskChildTasks().size() + 1);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            List jobExecutionIds = ((TaskExecutionResource)task.executions().stream().findFirst().get()).getJobExecutionIds();
            Assertions.assertThat((int)jobExecutionIds.size()).isEqualTo(1);
            this.dataFlowOperations.jobOperations().executionRestart(((Long)jobExecutionIds.get(0)).longValue());
            long launchId2 = task.executions().stream().mapToLong(TaskExecutionResource::getExecutionId).max().getAsLong();
            Awaitility.await().until(() -> task.executionStatus(launchId2) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((Comparable)task.executionStatus(launchId2)).isEqualTo((Object)TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId2).get()).getExitCode()).isEqualTo(0);
            this.childTasksBySuffix(task, "b").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(2);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            this.childTasksBySuffix(task, "t4").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            Assertions.assertThat((int)task.jobExecutionResources().size()).isEqualTo(2);
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    @Test
    public void failedBasicTransitionTest() {
        logger.info("composed-task-failedBasicTransition-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Sequential Failed Basic Transition Test", "b: scenario --io.spring.fail-task=true --io.spring.launch-batch-job=false 'FAILED' -> t1: timestamp * ->t2: timestamp", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("t1"), DataFlowIT.asList("b"), DataFlowIT.asList("t2"));
    }

    @Test
    public void successBasicTransitionTest() {
        logger.info("composed-task-successBasicTransition-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Success Basic Transition Test", "b: scenario --io.spring.launch-batch-job=false 'FAILED' -> t1: timestamp * ->t2: timestamp", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("b", "t2"), DataFlowIT.emptyList(), DataFlowIT.asList("t1"));
    }

    @Test
    public void basicTransitionWithTransitionTest() {
        logger.info("composed-task-basicTransitionWithTransition-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("basicTransitionWithTransitionTest", "b1: scenario  --io.spring.launch-batch-job=false 'FAILED' -> t1: timestamp  && b2: scenario --io.spring.launch-batch-job=false 'FAILED' -> t2: timestamp * ->t3: timestamp ", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("b1", "b2", "t3"), DataFlowIT.emptyList(), DataFlowIT.asList("t1", "t2"));
    }

    @Test
    public void wildCardOnlyInLastPositionTest() {
        logger.info("composed-task-wildCardOnlyInLastPosition-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("wildCardOnlyInLastPositionTest", "b1: scenario --io.spring.launch-batch-job=false 'FAILED' -> t1: timestamp  && b2: scenario --io.spring.launch-batch-job=false * ->t3: timestamp ", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("b1", "b2", "t3"), DataFlowIT.emptyList(), DataFlowIT.asList("t1"));
    }

    @Test
    public void failedCTRRetryTest() {
        logger.info("composed-task-failedCTRRetry-test");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition(String.format("b1:scenario --io.spring.fail-batch=true --io.spring.jobName=%s --spring.cloud.task.batch.fail-on-job-failure=true && t1:timestamp", DataFlowIT.randomJobName())).description("failedCTRRetryTest").build();){
            Assertions.assertThat((int)task.composedTaskChildTasks().size()).isEqualTo(2);
            Assertions.assertThat(task.composedTaskChildTasks().stream().map(Task::getTaskName).collect(Collectors.toList())).hasSameElementsAs(this.fullTaskNames(task, "b1", "t1"));
            long launchId = task.launch(this.composedTaskLaunchArguments(new String[0]));
            if (this.runtimeApps.dataflowServerVersionLowerThan("2.8.0-SNAPSHOT")) {
                Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            } else {
                Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.ERROR);
            }
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            this.childTasksBySuffix(task, "b1").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).isEqualTo(1);
            });
            this.childTasksBySuffix(task, "t1").forEach(childTask -> Assertions.assertThat((int)childTask.executions().size()).isEqualTo(0));
            Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(task.composedTaskChildTasks().size() + 1);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            List jobExecutionIds = ((TaskExecutionResource)task.executions().stream().findFirst().get()).getJobExecutionIds();
            Assertions.assertThat((int)jobExecutionIds.size()).isEqualTo(1);
            this.dataFlowOperations.jobOperations().executionRestart(((Long)jobExecutionIds.get(0)).longValue());
            long launchId2 = task.executions().stream().mapToLong(TaskExecutionResource::getExecutionId).max().getAsLong();
            Awaitility.await().until(() -> task.executionStatus(launchId2) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId2).get()).getExitCode()).isEqualTo(0);
            this.childTasksBySuffix(task, "b1").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(2);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            this.childTasksBySuffix(task, "t1").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            Assertions.assertThat((int)task.jobExecutionResources().size()).isEqualTo(2);
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    private void allSuccessfulExecutions(String taskDescription, String taskDefinition, String ... childLabels) {
        this.mixedSuccessfulFailedAndUnknownExecutions(taskDescription, taskDefinition, TaskExecutionStatus.COMPLETE, DataFlowIT.asList(childLabels), DataFlowIT.emptyList(), DataFlowIT.emptyList());
    }

    private void mixedSuccessfulFailedAndUnknownExecutions(String taskDescription, String taskDefinition, TaskExecutionStatus parentTaskExecutionStatus, List<String> successfulTasks, List<String> failedTasks, List<String> unknownTasks) {
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition(taskDefinition).description(taskDescription).build();){
            ArrayList<String> allTasks = new ArrayList<String>(successfulTasks);
            allTasks.addAll(failedTasks);
            allTasks.addAll(unknownTasks);
            Assertions.assertThat((int)task.composedTaskChildTasks().size()).isEqualTo(allTasks.size());
            Assertions.assertThat(task.composedTaskChildTasks().stream().map(Task::getTaskName).collect(Collectors.toList())).hasSameElementsAs(this.fullTaskNames(task, allTasks.toArray(new String[0])));
            long launchId = task.launch(this.composedTaskLaunchArguments(new String[0]));
            if (this.runtimeApps.dataflowServerVersionLowerThan("2.8.0-SNAPSHOT")) {
                Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            } else {
                Awaitility.await().until(() -> task.executionStatus(launchId) == parentTaskExecutionStatus);
            }
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            this.childTasksBySuffix(task, successfulTasks.toArray(new String[0])).forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).isEqualTo(0);
            });
            this.childTasksBySuffix(task, failedTasks.toArray(new String[0])).forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).isEqualTo(1);
            });
            this.childTasksBySuffix(task, unknownTasks.toArray(new String[0])).forEach(childTask -> Assertions.assertThat((int)childTask.executions().size()).isEqualTo(0));
            Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(task.composedTaskChildTasks().size() + 1);
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    private List<String> fullTaskNames(Task task, String ... childTaskNames) {
        return java.util.stream.Stream.of(childTaskNames).map(cn -> task.getTaskName() + "-" + cn.trim()).collect(Collectors.toList());
    }

    private List<Task> childTasksBySuffix(Task task, String ... suffixes) {
        return java.util.stream.Stream.of(suffixes).map(suffix -> (Task)task.composedTaskChildTaskByLabel(suffix).get()).collect(Collectors.toList());
    }

    private static String randomTaskName() {
        return "task-" + DataFlowIT.randomSuffix();
    }

    private static String randomJobName() {
        return "job-" + DataFlowIT.randomSuffix();
    }

    private static String randomSuffix() {
        return UUID.randomUUID().toString().substring(0, 10);
    }

    private static List<String> asList(String ... names) {
        return Arrays.asList(names);
    }

    private static List<String> emptyList() {
        return Collections.emptyList();
    }

    static {
        dockerCompose = DockerComposeFactory.startDockerCompose(tempDockerComposeYamlFolder);
    }
}

