/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.dataflow.integration.test;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import net.javacrumbs.jsonunit.assertj.JsonAssertions;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.assertj.core.api.ListAssert;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.dataflow.core.ApplicationType;
import org.springframework.cloud.dataflow.integration.test.DataFlowOperationsITConfiguration;
import org.springframework.cloud.dataflow.integration.test.IntegrationTestProperties;
import org.springframework.cloud.dataflow.integration.test.tags.DockerCompose;
import org.springframework.cloud.dataflow.integration.test.util.AwaitUtils;
import org.springframework.cloud.dataflow.integration.test.util.DockerComposeFactory;
import org.springframework.cloud.dataflow.integration.test.util.RuntimeApplicationHelper;
import org.springframework.cloud.dataflow.rest.client.AppRegistryOperations;
import org.springframework.cloud.dataflow.rest.client.DataFlowClientException;
import org.springframework.cloud.dataflow.rest.client.DataFlowOperations;
import org.springframework.cloud.dataflow.rest.client.DataFlowTemplate;
import org.springframework.cloud.dataflow.rest.client.config.DataFlowClientProperties;
import org.springframework.cloud.dataflow.rest.client.dsl.DeploymentPropertiesBuilder;
import org.springframework.cloud.dataflow.rest.client.dsl.Stream;
import org.springframework.cloud.dataflow.rest.client.dsl.StreamApplication;
import org.springframework.cloud.dataflow.rest.client.dsl.StreamDefinition;
import org.springframework.cloud.dataflow.rest.client.dsl.task.Task;
import org.springframework.cloud.dataflow.rest.client.dsl.task.TaskBuilder;
import org.springframework.cloud.dataflow.rest.resource.DetailedAppRegistrationResource;
import org.springframework.cloud.dataflow.rest.resource.JobInstanceResource;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionResource;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionStatus;
import org.springframework.cloud.dataflow.rest.resource.about.AboutResource;
import org.springframework.cloud.skipper.domain.SpringCloudDeployerApplicationManifestReader;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.hateoas.PagedModel;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.util.StreamUtils;
import org.springframework.web.util.UriComponentsBuilder;

@ExtendWith(value={SpringExtension.class})
@EnableConfigurationProperties(value={IntegrationTestProperties.class})
@TestMethodOrder(value=MethodOrderer.OrderAnnotation.class)
@Import(value={DataFlowOperationsITConfiguration.class})
@DockerCompose
public class DataFlowIT {
    private static final Logger logger = LoggerFactory.getLogger(DataFlowIT.class);
    @Autowired
    protected IntegrationTestProperties testProperties;
    @Autowired
    protected DataFlowTemplate dataFlowOperations;
    @Autowired
    protected RuntimeApplicationHelper runtimeApps;
    @Autowired
    protected DataFlowClientProperties dataFlowClientProperties;
    @TempDir
    static Path tempDockerComposeYamlFolder;
    @RegisterExtension
    public static Extension dockerCompose;
    private static final String SPRING_CLOUD_DATAFLOW_SKIPPER_PLATFORM_NAME = "spring.cloud.dataflow.skipper.platformName";
    public static final String DEPLOYED = "deployed";
    public static final String DELETED = "deleted";
    public static final String UNDEPLOYED = "undeployed";
    public static final String DEPLOYING = "deploying";
    public static final String PARTIAL = "partial";
    public static final int EXIT_CODE_SUCCESS = 0;
    public static final int EXIT_CODE_ERROR = 1;
    public static final String TEST_VERSION_NUMBER = "2.0.2";
    public static final String CURRENT_VERSION_NUMBER = "2.0.1";

    @BeforeEach
    public void before() {
        Awaitility.setDefaultPollInterval((Duration)Duration.ofSeconds(5L));
        Awaitility.setDefaultTimeout((Duration)Duration.ofMinutes(15L));
        this.registerTasks();
        this.resetTimestampVersion();
    }

    @AfterEach
    public void after() {
        try {
            this.dataFlowOperations.streamOperations().destroyAll();
            logger.info("Destroyed all streams");
        }
        catch (Exception e) {
            logger.error("after:" + e.getMessage());
        }
        finally {
            try {
                this.dataFlowOperations.taskOperations().list().forEach(taskDefinitionResource -> {
                    logger.info("Destroying task {} and execution history", (Object)taskDefinitionResource.getName());
                    this.dataFlowOperations.taskOperations().destroy(taskDefinitionResource.getName(), true);
                });
                logger.info("Destroyed all tasks and execution history");
            }
            catch (Exception e) {
                logger.error("after:" + e.getMessage());
            }
        }
    }

    @Test
    @Order(value=-2147483648)
    public void aboutTestInfo() {
        logger.info("Available platforms: " + this.dataFlowOperations.streamOperations().listPlatforms().stream().map(d -> String.format("[name: %s, type: %s]", d.getName(), d.getType())).collect(Collectors.joining()));
        logger.info(String.format("Selected platform: [name: %s, type: %s]", this.runtimeApps.getPlatformName(), this.runtimeApps.getPlatformType()));
        logger.info("Wait until at least 60 apps are registered in SCDF");
        Awaitility.await().until(() -> this.dataFlowOperations.appRegistryOperations().list().getMetadata().getTotalElements() >= 60L);
    }

    @Test
    public void applicationMetadataMavenTests() {
        logger.info("application-metadata-maven-test");
        DetailedAppRegistrationResource mavenAppWithJarMetadata = this.dataFlowOperations.appRegistryOperations().info("file", ApplicationType.sink, false);
        ((ListAssert)Assertions.assertThat((List)mavenAppWithJarMetadata.getOptions()).describedAs("mavenAppWithJarMetadata", new Object[0])).hasSize(8);
        this.dataFlowOperations.appRegistryOperations().register("maven-app-without-metadata", ApplicationType.sink, "maven://org.springframework.cloud.stream.app:file-sink-kafka:3.0.1", null, true);
        DetailedAppRegistrationResource mavenAppWithoutMetadata = this.dataFlowOperations.appRegistryOperations().info("maven-app-without-metadata", ApplicationType.sink, false);
        ((ListAssert)Assertions.assertThat((List)mavenAppWithoutMetadata.getOptions()).describedAs("mavenAppWithoutMetadata", new Object[0])).hasSize(8);
        this.dataFlowOperations.appRegistryOperations().unregister("maven-app-without-metadata", ApplicationType.sink);
    }

    @Test
    @DisabledIfSystemProperty(named="PLATFORM_TYPE", matches="cloudfoundry")
    public void applicationMetadataDockerTests() {
        logger.info("application-metadata-docker-test");
        this.dataFlowOperations.appRegistryOperations().register("docker-app-with-container-metadata", ApplicationType.source, "docker:springcloudstream/time-source-kafka:2.1.4.RELEASE", null, true);
        DetailedAppRegistrationResource dockerAppWithContainerMetadata = this.dataFlowOperations.appRegistryOperations().info("docker-app-with-container-metadata", ApplicationType.source, false);
        Assertions.assertThat((List)dockerAppWithContainerMetadata.getOptions()).hasSize(6);
        this.dataFlowOperations.appRegistryOperations().register("docker-app-with-container-metadata-escape-chars", ApplicationType.source, "docker:springcloudstream/http-source-rabbit:2.1.3.RELEASE", null, true);
        DetailedAppRegistrationResource dockerAppWithContainerMetadataWithEscapeChars = this.dataFlowOperations.appRegistryOperations().info("docker-app-with-container-metadata-escape-chars", ApplicationType.source, false);
        Assertions.assertThat((List)dockerAppWithContainerMetadataWithEscapeChars.getOptions()).hasSize(6);
        this.dataFlowOperations.appRegistryOperations().register("docker-app-without-metadata", ApplicationType.sink, "docker:springcloudstream/file-sink-kafka:2.1.1.RELEASE", null, true);
        DetailedAppRegistrationResource dockerAppWithoutMetadata = this.dataFlowOperations.appRegistryOperations().info("docker-app-without-metadata", ApplicationType.sink, false);
        Assertions.assertThat((List)dockerAppWithoutMetadata.getOptions()).hasSize(0);
        this.dataFlowOperations.appRegistryOperations().register("docker-app-with-jar-metadata", ApplicationType.sink, "docker:springcloudstream/file-sink-kafka:2.1.1.RELEASE", "maven://org.springframework.cloud.stream.app:file-sink-kafka:jar:metadata:2.1.1.RELEASE", true);
        DetailedAppRegistrationResource dockerAppWithJarMetadata = this.dataFlowOperations.appRegistryOperations().info("docker-app-with-jar-metadata", ApplicationType.sink, false);
        Assertions.assertThat((List)dockerAppWithJarMetadata.getOptions()).hasSize(8);
        this.dataFlowOperations.appRegistryOperations().unregister("docker-app-with-container-metadata", ApplicationType.source);
        this.dataFlowOperations.appRegistryOperations().unregister("docker-app-with-container-metadata-escape-chars", ApplicationType.source);
        this.dataFlowOperations.appRegistryOperations().unregister("docker-app-without-metadata", ApplicationType.sink);
        this.dataFlowOperations.appRegistryOperations().unregister("docker-app-with-jar-metadata", ApplicationType.sink);
    }

    @Test
    @EnabledIfEnvironmentVariable(named="SCDF_CR_TEST", matches="true")
    public void githubContainerRegistryTests() {
        this.containerRegistryTests("github-log-sink", "docker:ghcr.io/tzolov/log-sink-rabbit:3.1.0-SNAPSHOT");
    }

    @Test
    @EnabledIfEnvironmentVariable(named="SCDF_CR_TEST", matches="true")
    public void azureContainerRegistryTests() {
        this.containerRegistryTests("azure-log-sink", "docker:scdftest.azurecr.io/springcloudstream/log-sink-rabbit:3.1.0-SNAPSHOT");
    }

    @Test
    @EnabledIfEnvironmentVariable(named="SCDF_CR_TEST", matches="true")
    public void harborContainerRegistryTests() {
        this.containerRegistryTests("harbor-log-sink", "docker:projects.registry.vmware.com/scdf/scdftest/log-sink-rabbit:3.1.0-SNAPSHOT");
    }

    private void containerRegistryTests(String appName, String appUrl) {
        logger.info("application-metadata-{}-container-registry-test", (Object)appName);
        this.dataFlowOperations.appRegistryOperations().register(appName, ApplicationType.sink, appUrl, null, true);
        DetailedAppRegistrationResource dockerAppWithContainerMetadata = this.dataFlowOperations.appRegistryOperations().info(appName, ApplicationType.sink, false);
        Assertions.assertThat((List)dockerAppWithContainerMetadata.getOptions()).hasSize(3);
        this.dataFlowOperations.appRegistryOperations().unregister(appName, ApplicationType.sink);
    }

    @Test
    public void featureInfo() {
        logger.info("platform-feature-info-test");
        AboutResource about = this.dataFlowOperations.aboutOperation().get();
        Assertions.assertThat((boolean)about.getFeatureInfo().isAnalyticsEnabled()).isTrue();
        Assertions.assertThat((boolean)about.getFeatureInfo().isStreamsEnabled()).isTrue();
        Assertions.assertThat((boolean)about.getFeatureInfo().isTasksEnabled()).isTrue();
    }

    @Test
    public void appsCount() {
        logger.info("platform-apps-count-test");
        Assertions.assertThat((long)this.dataFlowOperations.appRegistryOperations().list().getMetadata().getTotalElements()).isGreaterThanOrEqualTo(60L);
    }

    @Test
    public void streamTransform() {
        logger.info("stream-transform-test");
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("transform-test").definition("http | transform --expression=payload.toUpperCase() | log").create().deploy(this.testDeploymentProperties("http"));){
            AwaitUtils.StreamLog offset = AwaitUtils.logOffset(stream);
            Assertions.assertThat((String)stream.getStatus()).is(DataFlowIT.condition(status -> status.equals(DEPLOYING) || status.equals(PARTIAL)));
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            String message = "Unique Test message: " + new Random().nextInt();
            this.runtimeApps.httpPost(stream.getName(), "http", message);
            AwaitUtils.StreamLog logOffset = AwaitUtils.logOffset(stream, "log");
            Awaitility.await().until(() -> logOffset.logs().contains(message.toUpperCase()));
        }
    }

    @Test
    public void streamPartitioning() {
        logger.info("stream-partitioning-test (aka. WoodChuckTests)");
        StreamDefinition streamDefinition = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("partitioning-test").definition("http | splitter --expression=payload.split(' ') | log").create();
        try (Stream stream = streamDefinition.deploy(new DeploymentPropertiesBuilder().putAll(this.testDeploymentProperties("http", "log")).put(SPRING_CLOUD_DATAFLOW_SKIPPER_PLATFORM_NAME, this.runtimeApps.getPlatformName()).put("deployer.log.count", "2").put("app.splitter.producer.partitionKeyExpression", "payload").put("app.log.spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled", "false").put("app.log.logging.pattern.level", "WOODCHUCK-${INSTANCE_INDEX:${CF_INSTANCE_INDEX:${spring.cloud.stream.instanceIndex:666}}} %5p").build());){
            AwaitUtils.StreamLog offset = AwaitUtils.logOffset(stream);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            String message = "How much wood would a woodchuck chuck if a woodchuck could chuck wood";
            this.runtimeApps.httpPost(stream.getName(), "http", message);
            Awaitility.await().until(() -> {
                Collection<String> logs = this.runtimeApps.applicationInstanceLogs(stream.getName(), "log").values();
                return logs.size() == 2 && logs.stream().map(log -> log.contains("WOODCHUCK-0") ? DataFlowIT.asList("WOODCHUCK-0", "How", "chuck").stream().allMatch(log::contains) : DataFlowIT.asList("WOODCHUCK-1", "much", "wood", "would", "if", "a", "woodchuck", "could").stream().allMatch(log::contains)).reduce(Boolean::logicalAnd).orElse(false) != false;
            });
        }
    }

    @Test
    @Order(value=-2147483638)
    public void streamAppCrossVersion() {
        String VERSION_2_1_5 = "2.1.5.RELEASE";
        String VERSION_3_0_1 = "3.0.1";
        Assumptions.assumeTrue((!this.runtimeApps.getPlatformType().equals("cloudfoundry") || this.runtimeApps.dataflowServerVersionEqualOrGreaterThan("2.7.0") ? 1 : 0) != 0, (String)"stream-app-cross-version-test: SKIP - CloudFoundry 2.6 and below!");
        Assumptions.assumeTrue((this.runtimeApps.isAppRegistered("ver-log", ApplicationType.sink, "3.0.1") && this.runtimeApps.isAppRegistered("ver-log", ApplicationType.sink, "2.1.5.RELEASE") ? 1 : 0) != 0, (String)"stream-app-cross-version-test: SKIP - required ver-log apps not registered!");
        logger.info("stream-app-cross-version-test: DEPLOY");
        int CURRENT_MANIFEST = 0;
        String RANDOM_SUFFIX = DataFlowIT.randomSuffix();
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("app-cross-version-test" + RANDOM_SUFFIX).definition("http | ver-log").create().deploy(new DeploymentPropertiesBuilder().putAll(this.testDeploymentProperties("http")).put("version.ver-log", "3.0.1").build());){
            AwaitUtils.StreamLog offset = AwaitUtils.logOffset(stream);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            Supplier<String> currentVerLogVersion = () -> new SpringCloudDeployerApplicationManifestReader().read(stream.manifest(CURRENT_MANIFEST)).stream().filter(m -> ((String)m.getMetadata().get("name")).equals("ver-log")).map(m -> m.getSpec().getVersion()).findFirst().orElse("none");
            Consumer<String> awaitSendAndReceiveTestMessage = message -> Awaitility.await().until(() -> {
                this.runtimeApps.httpPost(stream.getName(), "http", (String)message);
                return stream.logs(this.app("ver-log")).contains((CharSequence)message);
            });
            awaitSendAndReceiveTestMessage.accept(String.format("TEST MESSAGE 1-%s ", RANDOM_SUFFIX));
            Assertions.assertThat((String)currentVerLogVersion.get()).isEqualTo("3.0.1");
            Assertions.assertThat((int)stream.history().size()).isEqualTo(1L);
            logger.info("stream-app-cross-version-test: UPDATE");
            stream.update(new DeploymentPropertiesBuilder().put("version.ver-log", "2.1.5.RELEASE").build());
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            awaitSendAndReceiveTestMessage.accept(String.format("TEST MESSAGE 2-%s ", RANDOM_SUFFIX));
            Assertions.assertThat((String)currentVerLogVersion.get()).isEqualTo("2.1.5.RELEASE");
            Assertions.assertThat((int)stream.history().size()).isEqualTo(2);
            logger.info("stream-app-cross-version-test: ROLLBACK");
            stream.rollback(0);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            awaitSendAndReceiveTestMessage.accept(String.format("TEST MESSAGE 3-%s ", RANDOM_SUFFIX));
            Assertions.assertThat((String)currentVerLogVersion.get()).isEqualTo("3.0.1");
            Assertions.assertThat((int)stream.history().size()).isEqualTo(3);
        }
        logger.info("stream-app-cross-version-test: DESTROY");
        Assertions.assertThat((long)Optional.ofNullable(this.dataFlowOperations.streamOperations().list().getMetadata()).orElse(new PagedModel.PageMetadata(0L, 0L, 0L)).getTotalElements()).isEqualTo(0L);
    }

    @Test
    public void streamLifecycle() {
        this.streamLifecycleHelper(1, s -> {});
    }

    @Test
    public void streamLifecycleWithTwoInstance() {
        int numberOfInstancePerApp = 2;
        this.streamLifecycleHelper(2, stream -> {
            Map streamApps = stream.runtimeApps();
            Assertions.assertThat((int)streamApps.size()).isEqualTo(2);
            for (Map instanceMap : streamApps.values()) {
                Assertions.assertThat((int)instanceMap.size()).isEqualTo(2);
            }
        });
    }

    private void streamLifecycleHelper(int appInstanceCount, Consumer<Stream> streamAssertions) {
        logger.info("stream-lifecycle-test: DEPLOY");
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("lifecycle-test" + DataFlowIT.randomSuffix()).definition("time | log --log.name='TEST' --log.expression='TICKTOCK - TIMESTAMP: '.concat(payload)").create().deploy(new DeploymentPropertiesBuilder().putAll(this.testDeploymentProperties(new String[0])).put("deployer.*.count", "" + appInstanceCount).build());){
            AwaitUtils.StreamLog offset = AwaitUtils.logOffset(stream);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            streamAssertions.accept(stream);
            Awaitility.await().until(() -> stream.logs(this.app("log")).contains("TICKTOCK - TIMESTAMP:"));
            Assertions.assertThat((int)stream.history().size()).isEqualTo(1L);
            Awaitility.await().until(() -> ((String)stream.history().get(1)).equals(DEPLOYED));
            Assertions.assertThat((String)stream.logs()).contains(new CharSequence[]{"TICKTOCK - TIMESTAMP:"});
            Assertions.assertThat((String)stream.logs(this.app("log"))).contains(new CharSequence[]{"TICKTOCK - TIMESTAMP:"});
            logger.info("stream-lifecycle-test: UPDATE");
            stream.update(new DeploymentPropertiesBuilder().put("app.log.log.expression", "'Updated TICKTOCK - TIMESTAMP: '.concat(payload)").put("app.*.management.endpoints.web.exposure.include", "*").build());
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            streamAssertions.accept(stream);
            AwaitUtils.StreamLog logOffset = AwaitUtils.logOffset(stream, "log");
            Awaitility.await().until(() -> logOffset.logs().contains("Updated TICKTOCK - TIMESTAMP:"));
            Assertions.assertThat((int)stream.history().size()).isEqualTo(2);
            Awaitility.await().until(() -> ((String)stream.history().get(1)).equals(DELETED));
            Awaitility.await().until(() -> ((String)stream.history().get(2)).equals(DEPLOYED));
            logger.info("stream-lifecycle-test: ROLLBACK");
            stream.rollback(0);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            streamAssertions.accept(stream);
            Awaitility.await().until(() -> logOffset.logs().contains("TICKTOCK - TIMESTAMP:"));
            Assertions.assertThat((int)stream.history().size()).isEqualTo(3);
            Awaitility.await().until(() -> ((String)stream.history().get(1)).equals(DELETED));
            Awaitility.await().until(() -> ((String)stream.history().get(2)).equals(DELETED));
            Awaitility.await().until(() -> ((String)stream.history().get(3)).equals(DEPLOYED));
            logger.info("stream-lifecycle-test: UNDEPLOY");
            stream.undeploy();
            Awaitility.await().until(() -> stream.getStatus().equals(UNDEPLOYED));
            Assertions.assertThat((int)stream.history().size()).isEqualTo(3);
            Awaitility.await().until(() -> ((String)stream.history().get(1)).equals(DELETED));
            Awaitility.await().until(() -> ((String)stream.history().get(2)).equals(DELETED));
            Awaitility.await().until(() -> ((String)stream.history().get(3)).equals(DELETED));
            Assertions.assertThat((long)this.dataFlowOperations.streamOperations().list().getMetadata().getTotalElements()).isEqualTo(1L);
        }
        logger.info("stream-lifecycle-test: DESTROY");
        Assertions.assertThat((long)this.dataFlowOperations.streamOperations().list().getMetadata().getTotalElements()).isEqualTo(0L);
    }

    @Test
    public void streamScaling() {
        logger.info("stream-scaling-test");
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("stream-scaling-test").definition("time | log --log.expression='TICKTOCK - TIMESTAMP: '.concat(payload)").create().deploy(this.testDeploymentProperties(new String[0]));){
            AwaitUtils.StreamLog offset = AwaitUtils.logOffset(stream);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            StreamApplication time = this.app("time");
            StreamApplication log = this.app("log");
            Map streamApps = stream.runtimeApps();
            Assertions.assertThat((int)streamApps.size()).isEqualTo(2);
            Assertions.assertThat((int)((Map)streamApps.get(time)).size()).isEqualTo(1);
            Assertions.assertThat((int)((Map)streamApps.get(log)).size()).isEqualTo(1);
            stream.scaleApplicationInstances(log, 2, Collections.emptyMap());
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            Awaitility.await().until(() -> ((Map)stream.runtimeApps().get(log)).size() == 2);
            Assertions.assertThat((String)stream.getStatus()).isEqualTo(DEPLOYED);
            streamApps = stream.runtimeApps();
            Assertions.assertThat((int)streamApps.size()).isEqualTo(2);
            Assertions.assertThat((int)((Map)streamApps.get(time)).size()).isEqualTo(1);
            Assertions.assertThat((int)((Map)streamApps.get(log)).size()).isEqualTo(2);
        }
    }

    @Test
    public void namedChannelDestination() {
        logger.info("stream-named-channel-destination-test");
        try (Stream logStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("log-destination-sink").definition(":LOG-DESTINATION > log").create().deploy(this.testDeploymentProperties(new String[0]));
             Stream httpStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("http-destination-source").definition("http > :LOG-DESTINATION").create().deploy(this.testDeploymentProperties("http"));){
            AwaitUtils.StreamLog logSinkOffset = AwaitUtils.logOffset(logStream);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(logSinkOffset)).until(() -> logStream.getStatus().equals(DEPLOYED));
            AwaitUtils.StreamLog httpSourceOffset = AwaitUtils.logOffset(httpStream);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(httpSourceOffset)).until(() -> httpStream.getStatus().equals(DEPLOYED));
            String message = "Unique Test message: " + new Random().nextInt();
            this.runtimeApps.httpPost(httpStream.getName(), "http", message);
            AwaitUtils.StreamLog logSinkLogOffset = AwaitUtils.logOffset(logStream, "log");
            Awaitility.await().until(() -> logSinkLogOffset.logs().contains(message));
        }
    }

    @Test
    public void namedChannelTap() {
        logger.info("stream-named-channel-tap-test");
        try (Stream httpLogStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("taphttp").definition("http | log").create().deploy(this.testDeploymentProperties("http"));
             Stream tapStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("tapstream").definition(":taphttp.http > log").create().deploy(this.testDeploymentProperties(new String[0]));){
            AwaitUtils.StreamLog httpLogOffset = AwaitUtils.logOffset(httpLogStream);
            AwaitUtils.StreamLog tapLogOffset = AwaitUtils.logOffset(tapStream);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(httpLogOffset)).until(() -> httpLogStream.getStatus().equals(DEPLOYED));
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(tapLogOffset)).until(() -> tapStream.getStatus().equals(DEPLOYED));
            String message = "Unique Test message: " + new Random().nextInt();
            this.runtimeApps.httpPost(httpLogStream.getName(), "http", message);
            Awaitility.await().until(() -> tapStream.logs(this.app("log")).contains(message));
        }
    }

    @Test
    public void namedChannelManyToOne() {
        logger.info("stream-named-channel-many-to-one-test");
        try (Stream logStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("many-to-one").definition(":MANY-TO-ONE-DESTINATION > log").create().deploy(this.testDeploymentProperties(new String[0]));
             Stream httpStreamOne = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("http-source-1").definition("http > :MANY-TO-ONE-DESTINATION").create().deploy(this.testDeploymentProperties("http"));
             Stream httpStreamTwo = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("http-source-2").definition("http > :MANY-TO-ONE-DESTINATION").create().deploy(this.testDeploymentProperties("http"));){
            AwaitUtils.StreamLog logOffset = AwaitUtils.logOffset(logStream);
            AwaitUtils.StreamLog httpOffsetOne = AwaitUtils.logOffset(httpStreamOne);
            AwaitUtils.StreamLog httpOffsetTwo = AwaitUtils.logOffset(httpStreamTwo);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(logOffset)).until(() -> logStream.getStatus().equals(DEPLOYED));
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(httpOffsetOne)).until(() -> httpStreamOne.getStatus().equals(DEPLOYED));
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(httpOffsetTwo)).until(() -> httpStreamTwo.getStatus().equals(DEPLOYED));
            String messageOne = "Unique Test message: " + new Random().nextInt();
            this.runtimeApps.httpPost(httpStreamOne.getName(), "http", messageOne);
            Awaitility.await().until(() -> logStream.logs(this.app("log")).contains(messageOne));
            String messageTwo = "Unique Test message: " + new Random().nextInt();
            this.runtimeApps.httpPost(httpStreamTwo.getName(), "http", messageTwo);
            Awaitility.await().until(() -> logStream.logs(this.app("log")).contains(messageTwo));
        }
    }

    @Test
    public void namedChannelDirectedGraph() {
        logger.info("stream-named-channel-directed-graph-test");
        try (Stream fooLogStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("directed-graph-destination1").definition(":foo > transform --expression=payload+'-foo' | log").create().deploy(this.testDeploymentProperties(new String[0]));
             Stream barLogStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("directed-graph-destination2").definition(":bar > transform --expression=payload+'-bar' | log").create().deploy(this.testDeploymentProperties(new String[0]));
             Stream httpStream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("directed-graph-http-source").definition("http | router --expression=payload.contains('a')?'foo':'bar'").create().deploy(this.testDeploymentProperties("http"));){
            AwaitUtils.StreamLog fooOffset = AwaitUtils.logOffset(fooLogStream);
            AwaitUtils.StreamLog barOffset = AwaitUtils.logOffset(barLogStream);
            AwaitUtils.StreamLog httpOffset = AwaitUtils.logOffset(httpStream);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(fooOffset)).until(() -> fooLogStream.getStatus().equals(DEPLOYED));
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(barOffset)).until(() -> barLogStream.getStatus().equals(DEPLOYED));
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(httpOffset)).until(() -> httpStream.getStatus().equals(DEPLOYED));
            String httpAppUrl = this.runtimeApps.getApplicationInstanceUrl(httpStream.getName(), "http");
            this.runtimeApps.httpPost(httpAppUrl, "abcd");
            this.runtimeApps.httpPost(httpAppUrl, "defg");
            AwaitUtils.StreamLog fooLogOffset = AwaitUtils.logOffset(fooLogStream, "log");
            AwaitUtils.StreamLog barLogOffset = AwaitUtils.logOffset(barLogStream, "log");
            Awaitility.await().until(() -> fooLogOffset.logs().contains("abcd-foo"));
            Awaitility.await().until(() -> barLogOffset.logs().contains("defg-bar"));
        }
    }

    @Test
    public void dataflowTaskLauncherSink() {
        if (this.runtimeApps.getPlatformType().equalsIgnoreCase("local")) {
            logger.warn("Skipping since it doesn't work local");
        } else {
            String dataflowTaskLauncherAppName = "dataflow-tasklauncher";
            String skipOnIncompatibleDataFlowVersion = dataflowTaskLauncherAppName + "-sink-test: SKIP - Dataflow version:" + this.runtimeApps.getDataflowServerVersion() + " is older than 2.9.1-SNAPSHOT!";
            if (!this.runtimeApps.dataflowServerVersionEqualOrGreaterThan("2.9.1-SNAPSHOT")) {
                logger.warn(skipOnIncompatibleDataFlowVersion);
            }
            Assumptions.assumeTrue((boolean)this.runtimeApps.dataflowServerVersionEqualOrGreaterThan("2.9.1-SNAPSHOT"), (String)skipOnIncompatibleDataFlowVersion);
            String skipOnMissingAppRegistration = dataflowTaskLauncherAppName + "-sink-test: SKIP - no " + dataflowTaskLauncherAppName + " app registered!";
            boolean isDataflowTaskLauncherAppRegistered = this.runtimeApps.isAppRegistered(dataflowTaskLauncherAppName, ApplicationType.sink);
            if (!isDataflowTaskLauncherAppRegistered) {
                logger.info(skipOnMissingAppRegistration);
            }
            Assumptions.assumeTrue((boolean)isDataflowTaskLauncherAppRegistered, (String)skipOnMissingAppRegistration);
            DetailedAppRegistrationResource dataflowTaskLauncherRegistration = this.dataFlowOperations.appRegistryOperations().info(dataflowTaskLauncherAppName, ApplicationType.sink, false);
            logger.info("{}-sink-test: {} [{}], DataFlow [{}]", new Object[]{dataflowTaskLauncherAppName, dataflowTaskLauncherAppName, dataflowTaskLauncherRegistration.getVersion(), this.runtimeApps.getDataflowServerVersion()});
            String taskName = DataFlowIT.randomTaskName();
            try (Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(taskName).definition("testtimestamp").description("Test timestamp task").build();){
                logger.info("dataflowTaskLauncherSink:deploying:{}", (Object)dataflowTaskLauncherAppName);
                try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("tasklauncher-test").definition("http | " + dataflowTaskLauncherAppName + " --trigger.initialDelay=100 --trigger.maxPeriod=1000 --spring.cloud.dataflow.client.serverUri=" + this.dataFlowClientProperties.getServerUri()).create().deploy(this.testDeploymentProperties(new String[0]));){
                    AwaitUtils.StreamLog offset = AwaitUtils.logOffset(stream);
                    Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
                    HttpHeaders headers = new HttpHeaders();
                    headers.setContentType(MediaType.APPLICATION_JSON);
                    headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
                    byte[] data = ("{\"name\" : \"" + taskName + "\"}").getBytes(StandardCharsets.UTF_8);
                    this.runtimeApps.httpPost(stream.getName(), "http", data, headers);
                    AtomicLong launchId = new AtomicLong();
                    Awaitility.await().until(() -> task.executions().stream().filter(t -> t.getTaskName().equals(taskName) && t.getTaskExecutionStatus() == TaskExecutionStatus.COMPLETE).findFirst().map(t -> launchId.getAndSet(t.getExecutionId())).isPresent());
                    long id = launchId.get();
                    Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
                    Assertions.assertThat((boolean)task.execution(id).isPresent()).isTrue();
                    Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(id).get()).getExitCode()).isEqualTo(0);
                }
            }
        }
    }

    @Test
    public void analyticsCounterInflux() {
        if (!this.influxPresent()) {
            logger.info("stream-analytics-test: SKIP - no InfluxDB metrics configured!");
        }
        Assumptions.assumeTrue((boolean)this.influxPresent());
        if (!this.runtimeApps.isAppRegistered("analytics", ApplicationType.sink)) {
            logger.info("stream-analytics-influx-test: SKIP - no analytics app registered!");
        }
        Assumptions.assumeTrue((boolean)this.runtimeApps.isAppRegistered("analytics", ApplicationType.sink), (String)"stream-analytics-test: SKIP - no analytics app registered!");
        logger.info("stream-analytics-influx-test");
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("httpAnalyticsInflux").definition("http | analytics --analytics.name=my_http_analytics --analytics.tag.expression.msgSize=payload.length()").create().deploy(this.testDeploymentProperties("http"));){
            AwaitUtils.StreamLog offset = AwaitUtils.logOffset(stream);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            String message1 = "Test message 1";
            String message2 = "Test message 2 with extension";
            String message3 = "Test message 2 with double extension";
            String httpAppUrl = this.runtimeApps.getApplicationInstanceUrl(stream.getName(), "http");
            this.runtimeApps.httpPost(httpAppUrl, message1);
            this.runtimeApps.httpPost(httpAppUrl, message2);
            this.runtimeApps.httpPost(httpAppUrl, message3);
            Awaitility.await().until(() -> !JsonPath.parse((String)this.runtimeApps.httpGet(this.testProperties.getPlatform().getConnection().getInfluxUrl() + "/query?db=myinfluxdb&q=SELECT * FROM \"my_http_analytics\"")).read("$.results[0][?(@.series)].length()", new Predicate[0]).toString().equals("[]"));
            JsonAssertions.assertThatJson((Object)this.runtimeApps.httpGet(this.testProperties.getPlatform().getConnection().getInfluxUrl() + "/query?q=SHOW DATABASES")).inPath("$.results[0].series[0].values[1][0]").isEqualTo((Object)"myinfluxdb");
            List messageLengths = java.util.stream.Stream.of(message1, message2, message3).map(s -> String.format("\"%s\"", s.length())).collect(Collectors.toList());
            String myHttpCounter = this.runtimeApps.httpGet(this.testProperties.getPlatform().getConnection().getInfluxUrl() + "/query?db=myinfluxdb&q=SELECT * FROM \"my_http_analytics\"");
            JsonAssertions.assertThatJson((Object)myHttpCounter).inPath("$.results[0].series[0].values[0][7]").isIn(messageLengths);
            JsonAssertions.assertThatJson((Object)myHttpCounter).inPath("$.results[0].series[0].values[1][7]").isIn(messageLengths);
            JsonAssertions.assertThatJson((Object)myHttpCounter).inPath("$.results[0].series[0].values[2][7]").isIn(messageLengths);
        }
    }

    @Test
    public void analyticsCounterPrometheus() throws IOException {
        if (!this.runtimeApps.isAppRegistered("analytics", ApplicationType.sink)) {
            logger.info("stream-analytics-prometheus-test: SKIP - no analytics app registered!");
        }
        Assumptions.assumeTrue((boolean)this.runtimeApps.isAppRegistered("analytics", ApplicationType.sink), (String)"stream-analytics-test: SKIP - no analytics app registered!");
        if (!this.prometheusPresent()) {
            logger.info("stream-analytics-prometheus-test: SKIP - no Prometheus configured!");
        }
        Assumptions.assumeTrue((boolean)this.prometheusPresent());
        logger.info("stream-analytics-prometheus-test");
        try (Stream stream = Stream.builder((DataFlowOperations)this.dataFlowOperations).name("httpAnalyticsPrometheus").definition("http | analytics --analytics.name=my_http_analytics --analytics.tag.expression.msgSize=payload.length()").create().deploy(this.testDeploymentProperties("http"));){
            AwaitUtils.StreamLog offset = AwaitUtils.logOffset(stream);
            Awaitility.await().failFast(() -> AwaitUtils.hasErrorInLog(offset)).until(() -> stream.getStatus().equals(DEPLOYED));
            String message1 = "Test message 1";
            String message2 = "Test message 2 with extension";
            String message3 = "Test message 2 with double extension";
            String httpAppUrl = this.runtimeApps.getApplicationInstanceUrl(stream.getName(), "http");
            this.runtimeApps.httpPost(httpAppUrl, message1);
            this.runtimeApps.httpPost(httpAppUrl, message2);
            this.runtimeApps.httpPost(httpAppUrl, message3);
            Awaitility.await().until(() -> (Integer)JsonPath.parse((String)this.runtimeApps.httpGet(this.testProperties.getPlatform().getConnection().getPrometheusUrl() + "/api/v1/query?query=my_http_analytics_total")).read("$.data.result.length()", new Predicate[0]) > 0);
            JsonAssertions.assertThatJson((Object)this.runtimeApps.httpGet(this.testProperties.getPlatform().getConnection().getPrometheusUrl() + "/api/v1/query?query=my_http_analytics_total")).isEqualTo((Object)DataFlowIT.resourceToString("classpath:/my_http_analytics_total.json"));
        }
    }

    protected Map<String, String> testDeploymentProperties(String ... externallyAccessibleApps) {
        DeploymentPropertiesBuilder propertiesBuilder = new DeploymentPropertiesBuilder().put(SPRING_CLOUD_DATAFLOW_SKIPPER_PLATFORM_NAME, this.runtimeApps.getPlatformName()).put("app.*.logging.file", "/tmp/${PID}-test.log").put("app.*.logging.file.name", "/tmp/${PID}-test.log").put("app.*.endpoints.logfile.sensitive", "false").put("app.*.endpoints.logfile.enabled", "true").put("app.*.management.endpoints.web.exposure.include", "*").put("app.*.spring.cloud.streamapp.security.enabled", "false");
        if (this.runtimeApps.getPlatformType().equalsIgnoreCase("kubernetes")) {
            propertiesBuilder.put("app.*.server.port", "8080");
            for (String appName : externallyAccessibleApps) {
                propertiesBuilder.put("deployer." + appName + ".kubernetes.createLoadBalancer", "true");
            }
        }
        return propertiesBuilder.build();
    }

    public static String resourceToString(String resourcePath) throws IOException {
        return StreamUtils.copyToString((InputStream)new DefaultResourceLoader().getResource(resourcePath).getInputStream(), (Charset)StandardCharsets.UTF_8);
    }

    protected boolean prometheusPresent() {
        return this.runtimeApps.isServicePresent(this.testProperties.getPlatform().getConnection().getPrometheusUrl() + "/api/v1/query?query=up");
    }

    protected boolean influxPresent() {
        return this.runtimeApps.isServicePresent(this.testProperties.getPlatform().getConnection().getInfluxUrl() + "/ping");
    }

    public static Condition<String> condition(java.util.function.Predicate predicate) {
        return new Condition(predicate, "", new Object[0]);
    }

    protected StreamApplication app(String appName) {
        return new StreamApplication(appName);
    }

    private List<String> composedTaskLaunchArguments(String ... additionalArguments) {
        ArrayList<String> commonTaskArguments = new ArrayList<String>();
        commonTaskArguments.addAll(Arrays.asList("--dataflow-server-use-user-access-token=true"));
        commonTaskArguments.addAll(Arrays.asList(additionalArguments));
        return commonTaskArguments;
    }

    @Test
    @EnabledIfSystemProperty(named="PLATFORM_TYPE", matches="local")
    public void runBatchRemotePartitionJobLocal() {
        logger.info("runBatchRemotePartitionJob - local");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition("batch-remote-partition").description("runBatchRemotePartitionJob - local").build();){
            long launchId = task.launch(Collections.EMPTY_MAP, this.composedTaskLaunchArguments("--platform=local"));
            Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((boolean)task.execution(launchId).isPresent()).isTrue();
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
        }
    }

    @Test
    public void timestampTask() {
        logger.info("task-timestamp-test");
        this.assertTaskRegistration("testtimestamp");
        try (Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("testtimestamp").description("Test timestamp task").build();){
            long launchId1 = task.launch();
            this.validateSuccessfulTaskLaunch(task, launchId1);
            long launchId2 = task.launch();
            Awaitility.await().until(() -> task.executionStatus(launchId2) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((boolean)task.execution(launchId2).isPresent()).isTrue();
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId2).get()).getExitCode()).isEqualTo(0);
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
        }
    }

    @Test
    public void taskMetricsPrometheus() throws IOException {
        if (!this.prometheusPresent()) {
            logger.info("task-metrics-test: SKIP - no metrics configured!");
        }
        Assumptions.assumeTrue((boolean)this.prometheusPresent());
        logger.info("task-metrics-test: Prometheus");
        try (Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("task-demo-metrics-prometheus --task.demo.delay.fixed=0s").description("Test task metrics").build();){
            long launchId = task.launch(Arrays.asList("--spring.cloud.task.closecontext_enabled=false"));
            Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((boolean)task.execution(launchId).isPresent()).isTrue();
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            URI qplUri = UriComponentsBuilder.fromHttpUrl((String)(this.testProperties.getPlatform().getConnection().getPrometheusUrl() + String.format("/api/v1/query?query=system_cpu_usage{service=\"task-application\",application=\"%s-%s\"}", task.getTaskName(), launchId))).build().toUri();
            Supplier<String> pqlTaskMetricsQuery = () -> (String)this.dataFlowOperations.getRestTemplate().exchange(qplUri, HttpMethod.GET, null, String.class).getBody();
            Awaitility.await().until(() -> (Integer)JsonPath.parse((String)((String)pqlTaskMetricsQuery.get())).read("$.data.result.length()", new Predicate[0]) > 0);
            JsonAssertions.assertThatJson((Object)pqlTaskMetricsQuery.get()).isEqualTo((Object)DataFlowIT.resourceToString("classpath:/task_metrics_system_cpu_usage.json"));
        }
    }

    @Test
    public void composedTask() {
        logger.info("task-composed-task-runner-test");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition("a: testtimestamp && b: testtimestamp").description("Test composedTask").build();){
            Assertions.assertThat((int)task.composedTaskChildTasks().size()).isEqualTo(2);
            long launchId1 = task.launch(this.composedTaskLaunchArguments(new String[0]));
            this.validateSuccessfulTaskLaunch(task, launchId1);
            task.composedTaskChildTasks().forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId1).get()).getExitCode()).isEqualTo(0);
            });
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            long launchId2 = task.launch(this.composedTaskLaunchArguments(new String[0]));
            Awaitility.await().until(() -> task.executionStatus(launchId2) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((Comparable)task.executionStatus(launchId2)).isEqualTo((Object)TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId2).get()).getExitCode()).isEqualTo(0);
            task.composedTaskChildTasks().forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(2);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(3);
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    @Test
    public void multipleComposedTaskWithArguments() {
        logger.info("task-multiple-composed-task-with-arguments-test");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition("a: testtimestamp && b: testtimestamp").description("Test multipleComposedTaskWithArguments").build();){
            Assertions.assertThat((int)task.composedTaskChildTasks().size()).isEqualTo(2);
            long launchId1 = task.launch(this.composedTaskLaunchArguments("--increment-instance-enabled=true"));
            Awaitility.await().until(() -> task.executionStatus(launchId1) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((Comparable)task.executionStatus(launchId1)).isEqualTo((Object)TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId1).get()).getExitCode()).isEqualTo(0);
            task.composedTaskChildTasks().forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId1).get()).getExitCode()).isEqualTo(0);
            });
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            long launchId2 = task.launch(this.composedTaskLaunchArguments("--increment-instance-enabled=true"));
            Awaitility.await().until(() -> task.executionStatus(launchId2) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((Comparable)task.executionStatus(launchId2)).isEqualTo((Object)TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId2).get()).getExitCode()).isEqualTo(0);
            task.composedTaskChildTasks().forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(2);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            Assertions.assertThat((int)task.jobExecutionResources().size()).isEqualTo(2);
            Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(3);
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    @Test
    public void ctrLaunchTest() {
        logger.info("composed-task-ctrLaunch-test");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition("a: testtimestamp && b: testtimestamp").description("ctrLaunchTest").build();){
            Assertions.assertThat(task.composedTaskChildTasks().stream().map(Task::getTaskName).collect(Collectors.toList())).hasSameElementsAs(this.fullTaskNames(task, "a", "b"));
            long launchId = task.launch(this.composedTaskLaunchArguments(new String[0]));
            Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((Comparable)task.executionStatus(launchId)).isEqualTo((Object)TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            task.composedTaskChildTasks().forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).isEqualTo(0);
            });
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            List jobExecutionIds = ((TaskExecutionResource)task.executions().stream().findFirst().get()).getJobExecutionIds();
            Assertions.assertThat((int)jobExecutionIds.size()).isEqualTo(1);
            Assumptions.assumingThat((boolean)this.runtimeApps.dataflowServerVersionEqualOrGreaterThan("2.7.0"), () -> {
                Exception exception = (Exception)org.junit.jupiter.api.Assertions.assertThrows(DataFlowClientException.class, () -> this.dataFlowOperations.jobOperations().executionRestart(((Long)jobExecutionIds.get(0)).longValue()));
                org.junit.jupiter.api.Assertions.assertTrue((boolean)exception.getMessage().contains(" and state 'COMPLETED' is not restartable"));
            });
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    @Test
    public void ctrFailedGraph() {
        logger.info("composed-task-ctrFailedGraph-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ctrFailedGraph", "scenario --io.spring.fail-task=true --io.spring.launch-batch-job=false && testtimestamp", TaskExecutionStatus.ERROR, DataFlowIT.emptyList(), DataFlowIT.asList("scenario"), DataFlowIT.asList("testtimestamp"));
    }

    @Test
    public void ctrSplit() {
        logger.info("composed-task-split-test");
        this.allSuccessfulExecutions("ComposedTask Split Test", "<t1:timestamp || t2:timestamp || t3:timestamp>", "t1", "t2", "t3");
    }

    @Test
    public void ctrSequential() {
        logger.info("composed-task-sequential-test");
        this.allSuccessfulExecutions("ComposedTask Sequential Test", "t1: testtimestamp && t2: testtimestamp && t3: testtimestamp", "t1", "t2", "t3");
    }

    @Test
    public void ctrSequentialTransitionAndSplitWithScenarioFailed() {
        logger.info("composed-task-SequentialTransitionAndSplitWithScenarioFailed-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Sequential Transition And Split With Scenario Failed Test", "t1: testtimestamp && scenario --io.spring.fail-task=true --io.spring.launch-batch-job=false 'FAILED'->t3: testtimestamp && <t4: testtimestamp || t5: testtimestamp> && t6: testtimestamp", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("t1", "t3"), DataFlowIT.asList("scenario"), DataFlowIT.asList("t4", "t5", "t6"));
    }

    @Test
    public void ctrSequentialTransitionAndSplitWithScenarioOk() {
        logger.info("composed-task-SequentialTransitionAndSplitWithScenarioOk-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Sequential Transition And Split With Scenario Ok Test", "t1: testtimestamp && t2: scenario 'FAILED'->t3: testtimestamp && <t4: testtimestamp || t5: testtimestamp> && t6: testtimestamp", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("t1", "t2", "t4", "t5", "t6"), DataFlowIT.emptyList(), DataFlowIT.asList("t3"));
    }

    @Test
    public void ctrNestedSplit() {
        logger.info("composed-task-NestedSplit");
        this.allSuccessfulExecutions("ctrNestedSplit", "<<t1: testtimestamp || t2: testtimestamp > && t3: testtimestamp || t4: testtimestamp>", "t1", "t2", "t3", "t4");
    }

    @Test
    public void testEmbeddedFailedGraph() {
        logger.info("composed-task-EmbeddedFailedGraph-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Embedded Failed Graph Test", String.format("a: testtimestamp && b:scenario  --io.spring.fail-batch=true --io.spring.jobName=%s --spring.cloud.task.batch.fail-on-job-failure=true && c: testtimestamp", DataFlowIT.randomJobName()), TaskExecutionStatus.ERROR, DataFlowIT.asList("a"), DataFlowIT.asList("b"), DataFlowIT.asList("c"));
    }

    @Test
    public void twoSplitTest() {
        logger.info("composed-task-twoSplit-test");
        this.allSuccessfulExecutions("twoSplitTest", "<t1: testtimestamp ||t2: testtimestamp||t3: testtimestamp> && <t4: testtimestamp||t5: testtimestamp>", "t1", "t2", "t3", "t4", "t5");
    }

    @Test
    public void sequentialAndSplitTest() {
        logger.info("composed-task-sequentialAndSplit-test");
        this.allSuccessfulExecutions("sequentialAndSplitTest", "<t1: testtimestamp && <t2: testtimestamp || t3: testtimestamp || t4: testtimestamp> && t5: testtimestamp>", "t1", "t2", "t3", "t4", "t5");
    }

    @Test
    public void sequentialTransitionAndSplitFailedInvalidTest() {
        logger.info("composed-task-sequentialTransitionAndSplitFailedInvalid-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Sequential Transition And Split Failed Invalid Test", "t1: testtimestamp && b:scenario --io.spring.fail-task=true --io.spring.launch-batch-job=false 'FAILED' -> t2: testtimestamp && t3: testtimestamp && t4: testtimestamp && <t5: testtimestamp || t6: testtimestamp> && t7: testtimestamp", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("t1", "t2"), DataFlowIT.asList("b"), DataFlowIT.asList("t3", "t4", "t5", "t6", "t7"));
    }

    @Test
    public void sequentialAndSplitWithFlowTest() {
        logger.info("composed-task-sequentialAndSplitWithFlow-test");
        this.allSuccessfulExecutions("sequentialAndSplitWithFlowTest", "t1: testtimestamp && <t2: testtimestamp && t3: testtimestamp || t4: testtimestamp ||t5: testtimestamp> && t6: testtimestamp", "t1", "t2", "t3", "t4", "t5", "t6");
    }

    @Test
    public void sequentialAndFailedSplitTest() {
        logger.info("composed-task-sequentialAndFailedSplit-test");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition(String.format("t1: testtimestamp && <t2: testtimestamp ||b:scenario --io.spring.fail-batch=true --io.spring.jobName=%s --spring.cloud.task.batch.fail-on-job-failure=true || t3: testtimestamp> && t4: testtimestamp", DataFlowIT.randomJobName())).description("sequentialAndFailedSplitTest").build();){
            Assertions.assertThat((int)task.composedTaskChildTasks().size()).isEqualTo(5);
            Assertions.assertThat(task.composedTaskChildTasks().stream().map(Task::getTaskName).collect(Collectors.toList())).hasSameElementsAs(this.fullTaskNames(task, "b", "t1", "t2", "t3", "t4"));
            long launchId = task.launch(this.composedTaskLaunchArguments(new String[0]));
            if (this.runtimeApps.dataflowServerVersionLowerThan("2.8.0-SNAPSHOT")) {
                Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            } else {
                Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.ERROR);
            }
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            this.childTasksBySuffix(task, "t1", "t2", "t3").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).isEqualTo(0);
            });
            this.childTasksBySuffix(task, "b").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).isEqualTo(1);
            });
            this.childTasksBySuffix(task, "t4").forEach(childTask -> Assertions.assertThat((int)childTask.executions().size()).isEqualTo(0));
            Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(task.composedTaskChildTasks().size() + 1);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            List jobExecutionIds = ((TaskExecutionResource)task.executions().stream().findFirst().get()).getJobExecutionIds();
            Assertions.assertThat((int)jobExecutionIds.size()).isEqualTo(1);
            this.dataFlowOperations.jobOperations().executionRestart(((Long)jobExecutionIds.get(0)).longValue());
            long launchId2 = task.executions().stream().mapToLong(TaskExecutionResource::getExecutionId).max().getAsLong();
            Awaitility.await().until(() -> task.executionStatus(launchId2) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((Comparable)task.executionStatus(launchId2)).isEqualTo((Object)TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId2).get()).getExitCode()).isEqualTo(0);
            this.childTasksBySuffix(task, "b").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(2);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            this.childTasksBySuffix(task, "t4").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            Assertions.assertThat((int)task.jobExecutionResources().size()).isEqualTo(2);
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    @Test
    public void failedBasicTransitionTest() {
        logger.info("composed-task-failedBasicTransition-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Sequential Failed Basic Transition Test", "b: scenario --io.spring.fail-task=true --io.spring.launch-batch-job=false 'FAILED' -> t1: testtimestamp * ->t2: testtimestamp", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("t1"), DataFlowIT.asList("b"), DataFlowIT.asList("t2"));
    }

    @Test
    public void successBasicTransitionTest() {
        logger.info("composed-task-successBasicTransition-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("ComposedTask Success Basic Transition Test", "b: scenario --io.spring.launch-batch-job=false 'FAILED' -> t1: testtimestamp * ->t2: testtimestamp", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("b", "t2"), DataFlowIT.emptyList(), DataFlowIT.asList("t1"));
    }

    @Test
    public void basicTransitionWithTransitionTest() {
        logger.info("composed-task-basicTransitionWithTransition-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("basicTransitionWithTransitionTest", "b1: scenario  --io.spring.launch-batch-job=false 'FAILED' -> t1: testtimestamp  && b2: scenario --io.spring.launch-batch-job=false 'FAILED' -> t2: testtimestamp * ->t3: testtimestamp ", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("b1", "b2", "t3"), DataFlowIT.emptyList(), DataFlowIT.asList("t1", "t2"));
    }

    @Test
    public void wildCardOnlyInLastPositionTest() {
        logger.info("composed-task-wildCardOnlyInLastPosition-test");
        this.mixedSuccessfulFailedAndUnknownExecutions("wildCardOnlyInLastPositionTest", "b1: scenario --io.spring.launch-batch-job=false 'FAILED' -> t1: testtimestamp  && b2: scenario --io.spring.launch-batch-job=false * ->t3: testtimestamp ", TaskExecutionStatus.COMPLETE, DataFlowIT.asList("b1", "b2", "t3"), DataFlowIT.emptyList(), DataFlowIT.asList("t1"));
    }

    @Test
    public void failedCTRRetryTest() {
        logger.info("composed-task-failedCTRRetry-test");
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition(String.format("b1:scenario --io.spring.fail-batch=true --io.spring.jobName=%s --spring.cloud.task.batch.fail-on-job-failure=true && t1: testtimestamp", DataFlowIT.randomJobName())).description("failedCTRRetryTest").build();){
            Assertions.assertThat((int)task.composedTaskChildTasks().size()).isEqualTo(2);
            Assertions.assertThat(task.composedTaskChildTasks().stream().map(Task::getTaskName).collect(Collectors.toList())).hasSameElementsAs(this.fullTaskNames(task, "b1", "t1"));
            long launchId = task.launch(this.composedTaskLaunchArguments(new String[0]));
            if (this.runtimeApps.dataflowServerVersionLowerThan("2.8.0-SNAPSHOT")) {
                Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            } else {
                Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.ERROR);
            }
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
            task.executions().forEach(execution -> Assertions.assertThat((Integer)execution.getExitCode()).isEqualTo(0));
            this.childTasksBySuffix(task, "b1").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).isEqualTo(1);
            });
            this.childTasksBySuffix(task, "t1").forEach(childTask -> Assertions.assertThat((int)childTask.executions().size()).isEqualTo(0));
            Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(task.composedTaskChildTasks().size() + 1);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
            List jobExecutionIds = ((TaskExecutionResource)task.executions().stream().findFirst().get()).getJobExecutionIds();
            Assertions.assertThat((int)jobExecutionIds.size()).isEqualTo(1);
            this.dataFlowOperations.jobOperations().executionRestart(((Long)jobExecutionIds.get(0)).longValue());
            long launchId2 = task.executions().stream().mapToLong(TaskExecutionResource::getExecutionId).max().getAsLong();
            Awaitility.await().until(() -> task.executionStatus(launchId2) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId2).get()).getExitCode()).isEqualTo(0);
            this.childTasksBySuffix(task, "b1").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(2);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            this.childTasksBySuffix(task, "t1").forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId2).get()).getExitCode()).isEqualTo(0);
            });
            Assertions.assertThat((int)task.jobExecutionResources().size()).isEqualTo(2);
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    @Test
    public void basicBatchSuccessTest() {
        logger.info("basic-batch-success-test");
        try (Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("scenario").description("Test scenario batch app").build();){
            String stepName = this.randomStepName();
            List<String> args = this.createNewJobandStepScenario(task.getTaskName(), stepName);
            long launchId = task.launch(args);
            this.validateSuccessfulTaskLaunch(task, launchId);
            this.verifySuccessfulJobAndStepScenario(task, stepName);
        }
    }

    private List<String> createNewJobandStepScenario(String jobName, String stepName) {
        ArrayList<String> result = new ArrayList<String>();
        result.add("--io.spring.jobName=" + jobName);
        result.add("--io.spring.stepName=" + stepName);
        return result;
    }

    private void validateSuccessfulTaskLaunch(Task task, long launchId) {
        this.validateSuccessfulTaskLaunch(task, launchId, 1);
    }

    private void validateSuccessfulTaskLaunch(Task task, long launchId, int sizeExpected) {
        Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
        Assertions.assertThat((int)task.executions().size()).isEqualTo(sizeExpected);
        Assertions.assertThat((boolean)task.execution(launchId).isPresent()).isTrue();
        Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).isEqualTo(0);
    }

    private void verifySuccessfulJobAndStepScenario(Task task, String stepName) {
        Assertions.assertThat((int)task.executions().size()).isEqualTo(1);
        List jobExecutionIds = ((TaskExecutionResource)task.executions().stream().findFirst().get()).getJobExecutionIds();
        Assertions.assertThat((int)jobExecutionIds.size()).isEqualTo(1);
        task.jobExecutionResources().stream().filter(jobExecution -> jobExecution.getName().equals(task.getTaskName())).forEach(jobExecutionResource -> {
            Assertions.assertThat((int)jobExecutionResource.getStepExecutionCount()).isEqualTo(1);
            task.jobStepExecutions(jobExecutionResource.getExecutionId().longValue()).forEach(stepExecutionResource -> Assertions.assertThat((String)stepExecutionResource.getStepExecution().getStepName()).isEqualTo(stepName));
        });
    }

    private String randomStepName() {
        return "step-" + DataFlowIT.randomSuffix();
    }

    @Test
    public void basicBatchSuccessRestartTest() {
        try (Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("scenario").description("Test scenario batch app").build();){
            String stepName = this.randomStepName();
            List<String> args = this.createNewJobandStepScenario(task.getTaskName(), stepName);
            long launchId = task.launch(args);
            this.validateSuccessfulTaskLaunch(task, launchId);
            this.verifySuccessfulJobAndStepScenario(task, stepName);
            List jobExecutionIds = ((TaskExecutionResource)task.executions().stream().findFirst().get()).getJobExecutionIds();
            Assumptions.assumingThat((boolean)this.runtimeApps.dataflowServerVersionEqualOrGreaterThan("2.7.0"), () -> {
                Exception exception = (Exception)org.junit.jupiter.api.Assertions.assertThrows(DataFlowClientException.class, () -> this.dataFlowOperations.jobOperations().executionRestart(((Long)jobExecutionIds.get(0)).longValue()));
                org.junit.jupiter.api.Assertions.assertTrue((boolean)exception.getMessage().contains(" and state 'COMPLETED' is not restartable"));
            });
        }
    }

    @Test
    public void basicBatchFailRestartTest() {
        logger.info("basic-batch-fail-restart-test");
        try (Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("scenario").description("Test scenario batch app that will fail on first pass").build();){
            String stepName = this.randomStepName();
            List<String> args = this.createNewJobandStepScenario(task.getTaskName(), stepName);
            args.add("--io.spring.failBatch=true");
            long launchId = task.launch(args);
            this.validateSuccessfulTaskLaunch(task, launchId);
            List jobExecutionIds = ((TaskExecutionResource)task.executions().stream().findFirst().get()).getJobExecutionIds();
            Assumptions.assumingThat((boolean)this.runtimeApps.dataflowServerVersionEqualOrGreaterThan("2.7.0"), () -> {
                this.dataFlowOperations.jobOperations().executionRestart(((Long)jobExecutionIds.get(0)).longValue());
                Awaitility.await().until(() -> task.jobExecutionResources().size() == 2);
                Awaitility.await().until(() -> ((TaskExecutionResource)task.executions().stream().findFirst().get()).getTaskExecutionStatus() == TaskExecutionStatus.COMPLETE);
                Assertions.assertThat((int)task.jobExecutionResources().size()).isEqualTo(2);
                List jobExecutionResources = ((JobInstanceResource)task.jobInstanceResources().stream().findFirst().get()).getJobExecutions().stream().collect(Collectors.toList());
                ArrayList batchStatuses = new ArrayList();
                jobExecutionResources.stream().forEach(jobExecutionResource -> batchStatuses.add(jobExecutionResource.getJobExecution().getStatus()));
                Assertions.assertThat(batchStatuses).contains((Object[])new BatchStatus[]{BatchStatus.FAILED});
                Assertions.assertThat(batchStatuses).contains((Object[])new BatchStatus[]{BatchStatus.COMPLETED});
            });
        }
    }

    @Test
    public void testLaunchOfDefaultThenVersion() {
        logger.info("multiple task app version test");
        this.minimumVersionCheck("testLaunchOfDefaultThenVersion");
        Task task = this.createTaskDefinition();
        long launchId = task.launch();
        this.validateSuccessfulTaskLaunch(task, launchId);
        this.registerNewTimestampVersion();
        this.validateSpecifiedVersion(task, CURRENT_VERSION_NUMBER);
        launchId = task.launch(Collections.singletonMap("version.testtimestamp", TEST_VERSION_NUMBER), null);
        this.validateSuccessfulTaskLaunch(task, launchId, 2);
        this.validateSpecifiedVersion(task, TEST_VERSION_NUMBER);
    }

    @Test
    public void testCreateTaskWithTwoVersionsLaunchDefaultVersion() {
        this.minimumVersionCheck("testCreateTaskWithTwoVersionsLaunchDefaultVersion");
        this.registerNewTimestampVersion();
        Task task = this.createTaskDefinition();
        long launchId = task.launch();
        this.validateSuccessfulTaskLaunch(task, launchId);
        this.validateSpecifiedVersion(task, CURRENT_VERSION_NUMBER);
    }

    @Test
    public void testLaunchOfNewVersionThenPreviousVersion() {
        this.minimumVersionCheck("testLaunchOfNewVersionThenDefault");
        this.registerNewTimestampVersion();
        Task task = this.createTaskDefinition();
        long launchId = task.launch(Collections.singletonMap("version.testtimestamp", TEST_VERSION_NUMBER), null);
        this.validateSuccessfulTaskLaunch(task, launchId);
        Assertions.assertThat((String)((TaskExecutionResource)task.execution(launchId).get()).getResourceUrl()).contains(new CharSequence[]{TEST_VERSION_NUMBER});
        launchId = task.launch(Collections.singletonMap("version.testtimestamp", CURRENT_VERSION_NUMBER), null);
        this.validateSuccessfulTaskLaunch(task, launchId, 2);
        this.validateSpecifiedVersion(task, CURRENT_VERSION_NUMBER);
    }

    @Test
    public void testWhenNoVersionIsSpecifiedPreviousVersionShouldBeUsed() {
        this.minimumVersionCheck("testWhenNoVersionIsSpecifiedPreviousVersionShouldBeUsed");
        this.registerNewTimestampVersion();
        Task task = this.createTaskDefinition();
        long launchId = task.launch(Collections.singletonMap("version.testtimestamp", TEST_VERSION_NUMBER), null);
        this.validateSuccessfulTaskLaunch(task, launchId);
        this.validateSpecifiedVersion(task, TEST_VERSION_NUMBER);
        launchId = task.launch();
        this.validateSuccessfulTaskLaunch(task, launchId, 2);
        this.validateSpecifiedVersion(task, TEST_VERSION_NUMBER, 2);
    }

    @Test
    public void testCreateTaskWithOneVersionLaunchInvalidVersion() {
        this.minimumVersionCheck("testCreateTaskWithOneVersionLaunchInvalidVersion");
        Task task = this.createTaskDefinition();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> task.launch(Collections.singletonMap("version.testtimestamp", TEST_VERSION_NUMBER), null)).isInstanceOf(DataFlowClientException.class)).hasMessageContaining("Unknown task app: testtimestamp");
    }

    @Test
    public void testInvalidVersionUsageShouldNotAffectSubsequentDefaultLaunch() {
        this.minimumVersionCheck("testInvalidVersionUsageShouldNotAffectSubsequentDefaultLaunch");
        Task task = this.createTaskDefinition();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> task.launch(Collections.singletonMap("version.testtimestamp", TEST_VERSION_NUMBER), null)).isInstanceOf(DataFlowClientException.class)).hasMessageContaining("Unknown task app: testtimestamp");
        long launchId = task.launch();
        this.validateSuccessfulTaskLaunch(task, launchId, 1);
        this.validateSpecifiedVersion(task, CURRENT_VERSION_NUMBER, 1);
    }

    @Test
    public void testDeletePreviouslyUsedVersionShouldFailIfRelaunched() {
        this.minimumVersionCheck("testDeletePreviouslyUsedVersionShouldFailIfRelaunched");
        this.registerNewTimestampVersion();
        Task task = this.createTaskDefinition();
        long launchId = task.launch(Collections.singletonMap("version.testtimestamp", TEST_VERSION_NUMBER), null);
        this.validateSuccessfulTaskLaunch(task, launchId);
        this.resetTimestampVersion();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> task.launch(Collections.singletonMap("version.testtimestamp", TEST_VERSION_NUMBER), null)).isInstanceOf(DataFlowClientException.class)).hasMessageContaining("Unknown task app: testtimestamp");
    }

    @Test
    public void testChangingTheAppDefaultVersionRunningBetweenChangesShouldBeSuccessful() {
        this.minimumVersionCheck("testChangingTheAppDefaultVersionRunningBetweenChangesShouldBeSuccessful");
        Task task = this.createTaskDefinition();
        long launchId = task.launch();
        this.validateSuccessfulTaskLaunch(task, launchId);
        this.validateSpecifiedVersion(task, CURRENT_VERSION_NUMBER);
        this.registerNewTimestampVersion();
        this.setDefaultVersionForTimestamp(TEST_VERSION_NUMBER);
        launchId = task.launch();
        this.validateSuccessfulTaskLaunch(task, launchId, 2);
        this.validateSpecifiedVersion(task, TEST_VERSION_NUMBER);
    }

    @Test
    public void testRollingBackDefaultToPreviousVersionAndRunningShouldBeSuccessful() {
        this.minimumVersionCheck("testRollingBackDefaultToPreviousVersionAndRunningShouldBeSuccessful");
        this.registerNewTimestampVersion();
        Task task = this.createTaskDefinition();
        long launchId = task.launch();
        this.validateSuccessfulTaskLaunch(task, launchId);
        this.validateSpecifiedVersion(task, CURRENT_VERSION_NUMBER);
        this.setDefaultVersionForTimestamp(TEST_VERSION_NUMBER);
        launchId = task.launch();
        this.validateSuccessfulTaskLaunch(task, launchId, 2);
        this.validateSpecifiedVersion(task, TEST_VERSION_NUMBER);
        task = this.createTaskDefinition();
        this.setDefaultVersionForTimestamp(CURRENT_VERSION_NUMBER);
        launchId = task.launch();
        this.validateSuccessfulTaskLaunch(task, launchId);
        this.validateSpecifiedVersion(task, CURRENT_VERSION_NUMBER);
    }

    @Test
    public void testUnregisteringAppShouldPreventTaskDefinitionLaunch() {
        this.minimumVersionCheck("testUnregisteringAppShouldPreventTaskDefinitionLaunch");
        Task task = this.createTaskDefinition();
        long launchId = task.launch();
        this.validateSuccessfulTaskLaunch(task, launchId);
        this.validateSpecifiedVersion(task, CURRENT_VERSION_NUMBER);
        AppRegistryOperations appRegistryOperations = this.dataFlowOperations.appRegistryOperations();
        appRegistryOperations.unregister("testtimestamp", ApplicationType.task, CURRENT_VERSION_NUMBER);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> task.launch()).isInstanceOf(DataFlowClientException.class)).hasMessageContaining("Unknown task app: testtimestamp");
    }

    private Task createTaskDefinition() {
        return this.createTaskDefinition("testtimestamp");
    }

    private Task createTaskDefinition(String definition) {
        String taskDefName = DataFlowIT.randomTaskName();
        return Task.builder((DataFlowOperations)this.dataFlowOperations).name(taskDefName).definition(definition).description(String.format("Test task definition %s using for app definition\"%s\"", taskDefName, definition)).build();
    }

    private void minimumVersionCheck(String testName) {
        Assumptions.assumeTrue((!this.runtimeApps.dataflowServerVersionLowerThan("2.8.0") ? 1 : 0) != 0, (String)(testName + ": SKIP - SCDF 2.7.x and below!"));
    }

    private void registerNewTimestampVersion() {
        this.registerTimestamp(TEST_VERSION_NUMBER);
    }

    private void registerTimestamp(String versionNumber) {
        if (this.runtimeApps.getPlatformType().equalsIgnoreCase("kubernetes")) {
            this.registerTask("testtimestamp", "docker:springcloudtask/timestamp-task", versionNumber);
        } else {
            this.registerTask("testtimestamp", "maven://io.spring:timestamp-task", versionNumber);
        }
    }

    private void setDefaultVersionForTimestamp(String version) {
        AppRegistryOperations appRegistryOperations = this.dataFlowOperations.appRegistryOperations();
        appRegistryOperations.makeDefault("testtimestamp", ApplicationType.task, version);
    }

    private void registerTasks() {
        if (this.runtimeApps.getPlatformType().equalsIgnoreCase("kubernetes")) {
            this.registerTask("testtimestamp", "docker:springcloudtask/timestamp-task", CURRENT_VERSION_NUMBER);
            this.registerTask("testtimestamp-batch", "docker:springcloudtask/timestamp-batch-task", CURRENT_VERSION_NUMBER);
        } else {
            this.registerTask("testtimestamp", "maven://io.spring:timestamp-task", CURRENT_VERSION_NUMBER);
            this.registerTask("testtimestamp-batch", "maven://io.spring:timestamp-batch-task", CURRENT_VERSION_NUMBER);
        }
    }

    private void assertTaskRegistration(String name) {
        try {
            AppRegistryOperations appRegistryOperations = this.dataFlowOperations.appRegistryOperations();
            DetailedAppRegistrationResource resource = appRegistryOperations.info(name, ApplicationType.task, false);
            logger.info("assertTaskRegistration:{}:{}", (Object)name, (Object)resource.getLinks());
        }
        catch (DataFlowClientException x) {
            org.junit.jupiter.api.Assertions.fail((String)x.getMessage());
        }
    }

    private void registerTask(String name, String artefact, String version) {
        AppRegistryOperations appRegistryOperations = this.dataFlowOperations.appRegistryOperations();
        try {
            String uri = artefact + ":" + version;
            appRegistryOperations.register(name, ApplicationType.task, uri, null, false);
            logger.info("registerTask:{}:{}", (Object)name, (Object)uri);
        }
        catch (DataFlowClientException x) {
            logger.debug("registerTask:" + name + ":Expected:" + (Object)((Object)x));
        }
    }

    private void resetTimestampVersion() {
        AppRegistryOperations appRegistryOperations = this.dataFlowOperations.appRegistryOperations();
        try {
            appRegistryOperations.unregister("testtimestamp", ApplicationType.task, TEST_VERSION_NUMBER);
        }
        catch (DataFlowClientException x) {
            logger.debug("resetTimestampVersion:Expected:" + (Object)((Object)x));
        }
        if (this.runtimeApps.getPlatformType().equalsIgnoreCase("kubernetes")) {
            this.registerTask("testtimestamp", "docker:springcloudtask/timestamp-task", CURRENT_VERSION_NUMBER);
        } else {
            this.registerTask("testtimestamp", "maven://io.spring:timestamp-task", CURRENT_VERSION_NUMBER);
        }
        this.setDefaultVersionForTimestamp(CURRENT_VERSION_NUMBER);
    }

    private void validateSpecifiedVersion(Task task, String version) {
        this.validateSpecifiedVersion(task, version, 1);
    }

    private void validateSpecifiedVersion(Task task, String version, int countExpected) {
        Assertions.assertThat((int)task.executions().stream().filter(taskExecutionResource -> taskExecutionResource.getResourceUrl().contains(version)).collect(Collectors.toList()).size()).isEqualTo(countExpected);
    }

    @Test
    public void basicTaskWithPropertiesTest() {
        logger.info("basic-task-with-properties-test");
        String testPropertyKey = "app.testtimestamp.test-prop-key";
        String testPropertyValue = "test-prop-value";
        try (Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("testtimestamp").description("Test testtimestamp app that will use properties").build();){
            String stepName = this.randomStepName();
            List<String> args = this.createNewJobandStepScenario(task.getTaskName(), stepName);
            long launchId = task.launch(Collections.singletonMap(testPropertyKey, testPropertyValue), args);
            this.validateSuccessfulTaskLaunch(task, launchId);
            long launchId1 = task.launch(args);
            Awaitility.await().until(() -> task.executionStatus(launchId1) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((int)task.executions().stream().filter(taskExecutionResource -> taskExecutionResource.getDeploymentProperties().containsKey(testPropertyKey)).collect(Collectors.toList()).size()).isEqualTo(2);
        }
    }

    @Test
    public void taskLaunchInvalidTaskDefinition() {
        logger.info("task-launch-invalid-task-definition");
        Exception exception = (Exception)org.junit.jupiter.api.Assertions.assertThrows(DataFlowClientException.class, () -> Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("foobar").description("Test scenario with invalid task definition").build());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)exception.getMessage().contains("The 'task:foobar' application could not be found."));
    }

    @Test
    public void taskLaunchWithArguments() {
        logger.info("basic-batch-success-test");
        String argument = "--testtimestamp.format=YYYY";
        try (Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("testtimestamp").description("Test launch apps with arguments app").build();){
            String stepName = this.randomStepName();
            List<String> baseArgs = this.createNewJobandStepScenario(task.getTaskName(), stepName);
            ArrayList<String> args = new ArrayList<String>(baseArgs);
            args.add("--testtimestamp.format=YYYY");
            long launchId = task.launch(args);
            this.validateSuccessfulTaskLaunch(task, launchId);
            long launchId1 = task.launch(baseArgs);
            Awaitility.await().until(() -> task.executionStatus(launchId1) == TaskExecutionStatus.COMPLETE);
            Assertions.assertThat((int)task.executions().size()).isEqualTo(2);
            Assertions.assertThat((int)task.executions().stream().filter(execution -> execution.getArguments().contains("--testtimestamp.format=YYYY")).collect(Collectors.toList()).size()).isEqualTo(1);
        }
    }

    @Test
    public void taskDefinitionDelete() {
        String taskName;
        logger.info("task-definition-delete");
        try (Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("scenario").description("Test scenario batch app that will fail on first pass").build();){
            taskName = task.getTaskName();
            String stepName = this.randomStepName();
            List<String> args = this.createNewJobandStepScenario(task.getTaskName(), stepName);
            long launchId = task.launch(args);
            this.validateSuccessfulTaskLaunch(task, launchId);
            Assertions.assertThat((int)this.dataFlowOperations.taskOperations().list().getContent().size()).isEqualTo(1);
        }
        this.verifyTaskDefAndTaskExecutionCount(taskName, 0, 1);
    }

    @Test
    public void taskDefinitionDeleteWithCleanup() {
        Task task = Task.builder((DataFlowOperations)this.dataFlowOperations).name(DataFlowIT.randomTaskName()).definition("scenario").description("Test scenario batch app that will fail on first pass").build();
        String stepName = this.randomStepName();
        List<String> args = this.createNewJobandStepScenario(task.getTaskName(), stepName);
        long launchId = task.launch(args);
        this.validateSuccessfulTaskLaunch(task, launchId);
        this.dataFlowOperations.taskOperations().destroy(task.getTaskName(), true);
        this.verifyTaskDefAndTaskExecutionCount(task.getTaskName(), 0, 0);
    }

    @Test
    public void testDeleteSingleTaskExecution() {
        this.minimumVersionCheck("testDeleteSingleTaskExecution");
        try (Task task = this.createTaskDefinition();){
            List<Long> launchIds = this.createTaskExecutionsForDefinition(task, 1);
            this.verifyAllSpecifiedTaskExecutions(task, launchIds, true);
            this.safeCleanupTaskExecution(task, launchIds.get(0));
            this.verifyAllSpecifiedTaskExecutions(task, launchIds, false);
        }
    }

    @Test
    public void testDeleteMultipleTaskExecution() {
        this.minimumVersionCheck("testDeleteMultipleTaskExecution");
        try (Task task = this.createTaskDefinition();){
            List<Long> launchIds = this.createTaskExecutionsForDefinition(task, 4);
            this.verifyAllSpecifiedTaskExecutions(task, launchIds, true);
            long retainedLaunchId = launchIds.get(3);
            launchIds.stream().filter(launchId -> launchId != retainedLaunchId).forEach(launchId -> {
                this.safeCleanupTaskExecution(task, (long)launchId);
                Assertions.assertThat((boolean)task.execution(launchId.longValue()).isPresent()).isFalse();
            });
            Assertions.assertThat((boolean)task.execution(retainedLaunchId).isPresent()).isTrue();
        }
    }

    @Test
    public void testDeleteAllTaskExecutionsShouldClearAllTaskExecutions() {
        this.minimumVersionCheck("testDeleteAllTaskExecutionsShouldClearAllTaskExecutions");
        try (Task task = this.createTaskDefinition();){
            List<Long> launchIds = this.createTaskExecutionsForDefinition(task, 4);
            this.verifyAllSpecifiedTaskExecutions(task, launchIds, true);
            this.safeCleanupAllTaskExecutions(task);
            this.verifyAllSpecifiedTaskExecutions(task, launchIds, false);
        }
    }

    @Test
    public void testDataFlowUsesLastAvailableTaskExecutionForItsProperties() {
        this.minimumVersionCheck("testDataFlowUsesLastAvailableTaskExecutionForItsProperties");
        try (Task task = this.createTaskDefinition();){
            List<Long> firstLaunchIds = this.createTaskExecutionsForDefinition(task, Collections.singletonMap("app.testtimestamp.firstkey", "firstvalue"), 1);
            this.verifyAllSpecifiedTaskExecutions(task, firstLaunchIds, true);
            long secondLaunchId = task.launch();
            Assertions.assertThat((boolean)task.execution(secondLaunchId).isPresent()).isTrue();
            this.validateSuccessfulTaskLaunch(task, secondLaunchId, 2);
            Optional taskExecution = task.execution(secondLaunchId);
            Map properties = ((TaskExecutionResource)taskExecution.get()).getAppProperties();
            Assertions.assertThat((boolean)properties.containsKey("firstkey")).isTrue();
        }
    }

    @Test
    public void testDataFlowUsesAllPropertiesRegardlessIfPreviousExecutionWasDeleted() {
        this.minimumVersionCheck("testDataFlowUsesAllPropertiesRegardlessIfPreviousExecutionWasDeleted");
        try (Task task = this.createTaskDefinition();){
            List<Long> firstLaunchIds = this.createTaskExecutionsForDefinition(task, Collections.singletonMap("app.testtimestamp.firstkey", "firstvalue"), 1);
            this.verifyAllSpecifiedTaskExecutions(task, firstLaunchIds, true);
            long secondLaunchId = task.launch(Collections.singletonMap("app.testtimestamp.secondkey", "secondvalue"), Collections.emptyList());
            Assertions.assertThat((boolean)task.execution(secondLaunchId).isPresent()).isTrue();
            this.validateSuccessfulTaskLaunch(task, secondLaunchId, 2);
            this.safeCleanupTaskExecution(task, secondLaunchId);
            Assertions.assertThat((boolean)task.execution(secondLaunchId).isPresent()).isFalse();
            long thirdLaunchId = task.launch(Collections.singletonMap("app.testtimestamp.thirdkey", "thirdvalue"), Collections.emptyList());
            Assertions.assertThat((boolean)task.execution(thirdLaunchId).isPresent()).isTrue();
            this.validateSuccessfulTaskLaunch(task, thirdLaunchId, 2);
            Optional taskExecution = task.execution(thirdLaunchId);
            Map properties = ((TaskExecutionResource)taskExecution.get()).getAppProperties();
            Assertions.assertThat((boolean)properties.containsKey("firstkey")).isTrue();
            Assertions.assertThat((boolean)properties.containsKey("secondkey")).isFalse();
            Assertions.assertThat((boolean)properties.containsKey("thirdkey")).isTrue();
        }
    }

    @Test
    public void testDeletingComposedTaskExecutionDeletesAllItsChildTaskExecutions() {
        this.minimumVersionCheck("testDeletingComposedTaskExecutionDeletesAllItsChildTaskExecutions");
        try (Task task = this.createTaskDefinition("AAA: testtimestamp && BBB: testtimestamp");){
            List<Long> launchIds = this.createTaskExecutionsForDefinition(task, 1);
            this.verifyAllSpecifiedTaskExecutions(task, launchIds, true);
            Optional aaaExecution = task.composedTaskChildExecution("AAA");
            Optional bbbExecution = task.composedTaskChildExecution("BBB");
            Assertions.assertThat((boolean)aaaExecution.isPresent()).isTrue();
            Assertions.assertThat((boolean)bbbExecution.isPresent()).isTrue();
            this.safeCleanupTaskExecution(task, launchIds.get(0));
            this.verifyAllSpecifiedTaskExecutions(task, launchIds, false);
            aaaExecution = task.composedTaskChildExecution("AAA");
            bbbExecution = task.composedTaskChildExecution("BBB");
            Assertions.assertThat((boolean)aaaExecution.isPresent()).isFalse();
            Assertions.assertThat((boolean)bbbExecution.isPresent()).isFalse();
        }
    }

    @Test
    public void testDeletingBatchTaskExecutionDeletesAllOfItsBatchRecords() {
        this.minimumVersionCheck("testDeletingBatchTaskExecutionDeletesAllOfItsBatchRecords");
        try (Task task = this.createTaskDefinition("testtimestamp-batch");){
            List<Long> launchIds = Collections.singletonList(task.launch(Collections.emptyMap(), Collections.singletonList("testKey=" + task.getTaskName())));
            this.verifyAllSpecifiedTaskExecutions(task, launchIds, true);
            this.validateSuccessfulTaskLaunch(task, launchIds.get(0), 1);
            List jobExecutionIds = ((TaskExecutionResource)task.execution(launchIds.get(0).longValue()).get()).getJobExecutionIds();
            Assertions.assertThat((int)jobExecutionIds.size()).isEqualTo(2);
            Assertions.assertThat((boolean)task.jobStepExecutions(((Long)jobExecutionIds.get(0)).longValue()).equals(1));
            this.safeCleanupTaskExecution(task, launchIds.get(0));
            this.verifyAllSpecifiedTaskExecutions(task, launchIds, false);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> task.jobStepExecutions(((Long)jobExecutionIds.get(0)).longValue())).isInstanceOf(DataFlowClientException.class)).hasMessageContaining("No JobExecution with id=");
        }
    }

    @Test
    public void testRestartingBatchTaskExecutionThatHasBeenDeleted() {
        this.minimumVersionCheck("testRestartingBatchTaskExecutionThatHasBeenDeleted");
        try (Task task = this.createTaskDefinition("testtimestamp-batch");){
            List<Long> launchIds = Collections.singletonList(task.launch(Collections.emptyMap(), Collections.singletonList("testKey=" + task.getTaskName())));
            this.verifyAllSpecifiedTaskExecutions(task, launchIds, true);
            this.validateSuccessfulTaskLaunch(task, launchIds.get(0), 1);
            List jobExecutionIds = ((TaskExecutionResource)task.execution(launchIds.get(0).longValue()).get()).getJobExecutionIds();
            Assertions.assertThat((int)jobExecutionIds.size()).isEqualTo(2);
            Assertions.assertThat((boolean)task.jobStepExecutions(((Long)jobExecutionIds.get(0)).longValue()).equals(1));
            this.safeCleanupTaskExecution(task, launchIds.get(0));
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.dataFlowOperations.jobOperations().executionRestart(((Long)jobExecutionIds.get(0)).longValue())).isInstanceOf(DataFlowClientException.class)).hasMessageContaining("There is no JobExecution with id=");
        }
    }

    private List<Long> createTaskExecutionsForDefinition(Task task, int executionCount) {
        return this.createTaskExecutionsForDefinition(task, Collections.emptyMap(), executionCount);
    }

    private List<Long> createTaskExecutionsForDefinition(Task task, Map<String, String> properties, int executionCount) {
        ArrayList<Long> launchIds = new ArrayList<Long>();
        for (int i = 0; i < executionCount; ++i) {
            launchIds.add(task.launch(properties, Collections.emptyList()));
            Assertions.assertThat((boolean)task.execution(((Long)launchIds.get(i)).longValue()).isPresent()).isTrue();
            this.validateSuccessfulTaskLaunch(task, (Long)launchIds.get(i), i + 1);
        }
        return launchIds;
    }

    private void verifyAllSpecifiedTaskExecutions(Task task, List<Long> launchIds, boolean isPresent) {
        launchIds.stream().forEach(launchId -> {
            if (isPresent) {
                Assertions.assertThat((boolean)task.execution(launchId.longValue()).isPresent()).isTrue();
            } else {
                Assertions.assertThat((boolean)task.execution(launchId.longValue()).isPresent()).isFalse();
            }
        });
    }

    private void verifyTaskDefAndTaskExecutionCount(String taskName, int taskDefCount, int taskExecCount) {
        Assertions.assertThat((int)this.dataFlowOperations.taskOperations().executionList().getContent().stream().filter(taskExecution -> taskExecution.getTaskName() != null && taskExecution.getTaskName().equals(taskName)).collect(Collectors.toList()).size()).isEqualTo(taskExecCount);
        Assertions.assertThat((int)this.dataFlowOperations.taskOperations().list().getContent().size()).isEqualTo(taskDefCount);
    }

    private void allSuccessfulExecutions(String taskDescription, String taskDefinition, String ... childLabels) {
        this.mixedSuccessfulFailedAndUnknownExecutions(taskDescription, taskDefinition, TaskExecutionStatus.COMPLETE, DataFlowIT.asList(childLabels), DataFlowIT.emptyList(), DataFlowIT.emptyList());
    }

    private void mixedSuccessfulFailedAndUnknownExecutions(String taskDescription, String taskDefinition, TaskExecutionStatus parentTaskExecutionStatus, List<String> successfulTasks, List<String> failedTasks, List<String> unknownTasks) {
        TaskBuilder taskBuilder = Task.builder((DataFlowOperations)this.dataFlowOperations);
        try (Task task = taskBuilder.name(DataFlowIT.randomTaskName()).definition(taskDefinition).description(taskDescription).build();){
            ArrayList<String> allTasks = new ArrayList<String>(successfulTasks);
            allTasks.addAll(failedTasks);
            allTasks.addAll(unknownTasks);
            Assertions.assertThat((int)task.composedTaskChildTasks().size()).isEqualTo(allTasks.size());
            ((ListAssert)Assertions.assertThat(task.composedTaskChildTasks().stream().map(Task::getTaskName).collect(Collectors.toList())).as("verify composedTaskChildTasks is the same as all tasks", new Object[0])).hasSameElementsAs(this.fullTaskNames(task, allTasks.toArray(new String[0])));
            long launchId = task.launch(this.composedTaskLaunchArguments(new String[0]));
            if (this.runtimeApps.dataflowServerVersionLowerThan("2.8.0-SNAPSHOT")) {
                Awaitility.await().until(() -> task.executionStatus(launchId) == TaskExecutionStatus.COMPLETE);
            } else {
                Awaitility.await().until(() -> task.executionStatus(launchId) == parentTaskExecutionStatus);
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)task.executions().size()).as("verify exactly one execution", new Object[0])).isEqualTo(1);
            ((AbstractIntegerAssert)Assertions.assertThat((Integer)((TaskExecutionResource)task.execution(launchId).get()).getExitCode()).as("verify successful execution of parent task", new Object[0])).isEqualTo(0);
            task.executions().forEach(execution -> ((AbstractIntegerAssert)Assertions.assertThat((Integer)execution.getExitCode()).as("verify successful execution of parent task", new Object[0])).isEqualTo(0));
            this.childTasksBySuffix(task, successfulTasks.toArray(new String[0])).forEach(childTask -> {
                ((AbstractIntegerAssert)Assertions.assertThat((int)childTask.executions().size()).as("verify each child task ran once", new Object[0])).isEqualTo(1);
                ((AbstractIntegerAssert)Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).as("verify each child task has a successful parent", new Object[0])).isEqualTo(0);
            });
            this.childTasksBySuffix(task, failedTasks.toArray(new String[0])).forEach(childTask -> {
                Assertions.assertThat((int)childTask.executions().size()).isEqualTo(1);
                Assertions.assertThat((Integer)((TaskExecutionResource)childTask.executionByParentExecutionId(launchId).get()).getExitCode()).isEqualTo(1);
            });
            this.childTasksBySuffix(task, unknownTasks.toArray(new String[0])).forEach(childTask -> Assertions.assertThat((int)childTask.executions().size()).isEqualTo(0));
            Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(task.composedTaskChildTasks().size() + 1);
        }
        Assertions.assertThat((int)taskBuilder.allTasks().size()).isEqualTo(0);
    }

    private List<String> fullTaskNames(Task task, String ... childTaskNames) {
        return java.util.stream.Stream.of(childTaskNames).map(cn -> task.getTaskName() + "-" + cn.trim()).collect(Collectors.toList());
    }

    private List<Task> childTasksBySuffix(Task task, String ... suffixes) {
        return java.util.stream.Stream.of(suffixes).map(suffix -> (Task)task.composedTaskChildTaskByLabel(suffix).get()).collect(Collectors.toList());
    }

    private void safeCleanupAllTaskExecutions(Task task) {
        this.doSafeCleanupTasks(() -> task.cleanupAllTaskExecutions());
    }

    private void safeCleanupTaskExecution(Task task, long taskExecutionId) {
        this.doSafeCleanupTasks(() -> task.cleanupTaskExecution(taskExecutionId));
    }

    private void doSafeCleanupTasks(Runnable cleanupOperation) {
        try {
            cleanupOperation.run();
        }
        catch (DataFlowClientException ex) {
            if (ex.getMessage().contains("(reason: pod does not exist)") || ex.getMessage().contains("(reason: job does not exist)")) {
                logger.warn("Unable to cleanup task executions: " + ex.getMessage());
            }
            throw ex;
        }
    }

    private static String randomTaskName() {
        return "task-" + DataFlowIT.randomSuffix();
    }

    private static String randomJobName() {
        return "job-" + DataFlowIT.randomSuffix();
    }

    private static String randomSuffix() {
        return UUID.randomUUID().toString().substring(0, 10);
    }

    private static List<String> asList(String ... names) {
        return Arrays.asList(names);
    }

    private static List<String> emptyList() {
        return Collections.emptyList();
    }

    static {
        dockerCompose = DockerComposeFactory.startDockerCompose(tempDockerComposeYamlFolder);
    }
}

