package org.eclipse.edc.connector.dataplane.azure.datafactory;

import com.azure.core.credential.AzureSasCredential;
import com.azure.resourcemanager.datafactory.models.PipelineResource;
import com.azure.resourcemanager.datafactory.models.PipelineRun;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.eclipse.edc.azure.blob.AzureSasToken;
import org.eclipse.edc.azure.blob.api.BlobStoreApi;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:org/eclipse/edc/connector/dataplane/azure/datafactory/AzureDataFactoryTransferManager.class */
public class AzureDataFactoryTransferManager {
    private static final String COMPLETE_BLOB_NAME = ".complete";
    private final Monitor monitor;
    private final Duration maxDuration;
    private final Clock clock;
    private final DataFactoryClient client;
    private final DataFactoryPipelineFactory pipelineFactory;
    private final BlobStoreApi blobStoreApi;
    private final TypeManager typeManager;
    private final KeyVaultClient keyVaultClient;
    private final Duration pollDelay;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/edc/connector/dataplane/azure/datafactory/AzureDataFactoryTransferManager$DataFactoryPipelineRunStates.class */
    public enum DataFactoryPipelineRunStates {
        Queued(false, false),
        InProgress(false, false),
        Succeeded(true, false),
        Failed(false, true),
        Canceling(false, true),
        Cancelled(false, true);

        final boolean succeeded;
        final boolean failed;

        DataFactoryPipelineRunStates(boolean z, boolean z2) {
            this.succeeded = z;
            this.failed = z2;
        }
    }

    public AzureDataFactoryTransferManager(Monitor monitor, DataFactoryClient dataFactoryClient, DataFactoryPipelineFactory dataFactoryPipelineFactory, Duration duration, Clock clock, BlobStoreApi blobStoreApi, TypeManager typeManager, KeyVaultClient keyVaultClient, Duration duration2) {
        this.monitor = monitor;
        this.client = dataFactoryClient;
        this.pipelineFactory = dataFactoryPipelineFactory;
        this.maxDuration = duration;
        this.clock = clock;
        this.blobStoreApi = blobStoreApi;
        this.typeManager = typeManager;
        this.keyVaultClient = keyVaultClient;
        this.pollDelay = duration2;
    }

    public CompletableFuture<StreamResult<Object>> transfer(DataFlowStartMessage dataFlowStartMessage) {
        PipelineResource createPipeline = this.pipelineFactory.createPipeline(dataFlowStartMessage);
        DataAddress destinationDataAddress = dataFlowStartMessage.getDestinationDataAddress();
        AzureSasToken azureSasToken = (AzureSasToken) this.typeManager.readValue(this.keyVaultClient.getSecret(destinationDataAddress.getKeyName()).getValue(), AzureSasToken.class);
        String stringProperty = destinationDataAddress.getStringProperty("account");
        String stringProperty2 = destinationDataAddress.getStringProperty("container");
        String runId = this.client.runPipeline(createPipeline).runId();
        this.monitor.debug("Created ADF pipeline for " + dataFlowStartMessage.getProcessId() + ". Run id is " + runId, new Throwable[0]);
        return awaitRunCompletion(runId).thenApply(streamResult -> {
            return streamResult.succeeded() ? complete(stringProperty, stringProperty2, azureSasToken.getSas()) : streamResult;
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            this.monitor.severe("Unhandled exception raised when transferring data", new Throwable[]{th});
            return StreamResult.error("Unhandled exception raised when transferring data" + ":" + th.getMessage());
        });
    }

    @NotNull
    private CompletableFuture<StreamResult<Object>> awaitRunCompletion(String str) {
        this.monitor.debug("Awaiting ADF pipeline completion for run " + str, new Throwable[0]);
        Instant plus = this.clock.instant().plus((TemporalAmount) this.maxDuration);
        while (this.clock.instant().isBefore(plus)) {
            PipelineRun pipelineRun = this.client.getPipelineRun(str);
            String status = pipelineRun.status();
            String message = pipelineRun.message();
            this.monitor.debug("ADF run status is " + status + " with message [" + message + "] for run " + str, new Throwable[0]);
            try {
                DataFactoryPipelineRunStates valueOf = DataFactoryPipelineRunStates.valueOf(status);
                if (valueOf.succeeded) {
                    return CompletableFuture.completedFuture(StreamResult.success());
                }
                if (valueOf.failed) {
                    return CompletableFuture.completedFuture(StreamResult.error(String.format("ADF run in state %s with message: %s", status, message)));
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(this.pollDelay.toMillis());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            } catch (IllegalArgumentException e2) {
                return CompletableFuture.completedFuture(StreamResult.error(String.format("ADF run in unexpected state %s with message: %s", status, message)));
            }
        }
        this.client.cancelPipelineRun(str);
        return CompletableFuture.completedFuture(StreamResult.error("ADF run timed out"));
    }

    private StreamResult<Object> complete(String str, String str2, String str3) {
        try {
            this.blobStoreApi.getBlobAdapter(str, str2, COMPLETE_BLOB_NAME, new AzureSasCredential(str3)).getOutputStream().close();
            return StreamResult.success();
        } catch (IOException e) {
            return StreamResult.error(String.format("Error creating blob %s on account %s", COMPLETE_BLOB_NAME, str));
        }
    }
}
