package com.google.cloud.dataflow.sdk.runners;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowMetricUpdateExtractor;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
import com.google.cloud.dataflow.sdk.util.TimeUtil;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.class */
public class DataflowPipelineJob implements PipelineResult {
    private String jobId;
    private String projectId;
    private Dataflow dataflowClient;

    @Nullable
    private PipelineResult.State terminalState = null;

    @Nullable
    private DataflowPipelineJob replacedByJob = null;
    private DataflowAggregatorTransforms aggregatorTransforms;
    private List<MetricUpdate> terminalMetricUpdates;
    static final int MESSAGES_POLLING_ATTEMPTS = 10;
    static final int STATUS_POLLING_ATTEMPTS = 5;
    private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class);
    static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
    static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);

    public DataflowPipelineJob(String str, String str2, Dataflow dataflow, DataflowAggregatorTransforms dataflowAggregatorTransforms) {
        this.projectId = str;
        this.jobId = str2;
        this.dataflowClient = dataflow;
        this.aggregatorTransforms = dataflowAggregatorTransforms;
    }

    public String getJobId() {
        return this.jobId;
    }

    public String getProjectId() {
        return this.projectId;
    }

    public DataflowPipelineJob getReplacedByJob() {
        if (this.terminalState == null) {
            throw new IllegalStateException("getReplacedByJob() called before job terminated");
        }
        if (this.replacedByJob == null) {
            throw new IllegalStateException("getReplacedByJob() called for job that was not replaced");
        }
        return this.replacedByJob;
    }

    public Dataflow getDataflowClient() {
        return this.dataflowClient;
    }

    @Nullable
    public PipelineResult.State waitToFinish(long j, TimeUnit timeUnit, MonitoringUtil.JobMessagesHandler jobMessagesHandler) throws IOException, InterruptedException {
        return waitToFinish(j, timeUnit, jobMessagesHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
    }

    @Nullable
    @VisibleForTesting
    PipelineResult.State waitToFinish(long j, TimeUnit timeUnit, MonitoringUtil.JobMessagesHandler jobMessagesHandler, Sleeper sleeper, NanoClock nanoClock) throws IOException, InterruptedException {
        PipelineResult.State stateWithRetries;
        MonitoringUtil monitoringUtil = new MonitoringUtil(this.projectId, this.dataflowClient);
        long j2 = 0;
        BackOff attemptAndTimeBoundedExponentialBackOff = timeUnit.toMillis(j) > 0 ? new AttemptAndTimeBoundedExponentialBackOff(10, MESSAGES_POLLING_INTERVAL, timeUnit.toMillis(j), AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS, nanoClock) : new AttemptBoundedExponentialBackOff(10, MESSAGES_POLLING_INTERVAL);
        do {
            stateWithRetries = getStateWithRetries(1, sleeper);
            boolean z = stateWithRetries == PipelineResult.State.UNKNOWN;
            if (jobMessagesHandler != null && !z) {
                try {
                    ArrayList<JobMessage> jobMessages = monitoringUtil.getJobMessages(this.jobId, j2);
                    if (!jobMessages.isEmpty()) {
                        j2 = TimeUtil.fromCloudTime(jobMessages.get(jobMessages.size() - 1).getTime()).getMillis();
                        jobMessagesHandler.process(jobMessages);
                    }
                } catch (GoogleJsonResponseException | SocketTimeoutException e) {
                    z = true;
                    LOG.warn("There were problems getting current job messages: {}.", e.getMessage());
                    LOG.debug("Exception information:", (Throwable) e);
                }
            }
            if (!z) {
                attemptAndTimeBoundedExponentialBackOff.reset();
                if (stateWithRetries.isTerminal()) {
                    return stateWithRetries;
                }
            }
        } while (BackOffUtils.next(sleeper, attemptAndTimeBoundedExponentialBackOff));
        LOG.warn("No terminal state was returned.  State value {}", stateWithRetries);
        return null;
    }

    public void cancel() throws IOException {
        Job job = new Job();
        job.setProjectId(this.projectId);
        job.setId(this.jobId);
        job.setRequestedState("JOB_STATE_CANCELLED");
        this.dataflowClient.projects().jobs().update(this.projectId, this.jobId, job).execute();
    }

    @Override // com.google.cloud.dataflow.sdk.PipelineResult
    public PipelineResult.State getState() {
        return this.terminalState != null ? this.terminalState : getStateWithRetries(5, Sleeper.DEFAULT);
    }

    @VisibleForTesting
    PipelineResult.State getStateWithRetries(int i, Sleeper sleeper) {
        if (this.terminalState != null) {
            return this.terminalState;
        }
        try {
            return MonitoringUtil.toState(getJobWithRetries(i, sleeper).getCurrentState());
        } catch (IOException e) {
            return PipelineResult.State.UNKNOWN;
        }
    }

    @VisibleForTesting
    Job getJobWithRetries(int i, Sleeper sleeper) throws IOException {
        AttemptBoundedExponentialBackOff attemptBoundedExponentialBackOff = new AttemptBoundedExponentialBackOff(i, STATUS_POLLING_INTERVAL);
        do {
            try {
                Job execute = this.dataflowClient.projects().jobs().get(this.projectId, this.jobId).execute();
                PipelineResult.State state = MonitoringUtil.toState(execute.getCurrentState());
                if (state.isTerminal()) {
                    this.terminalState = state;
                    this.replacedByJob = new DataflowPipelineJob(getProjectId(), execute.getReplacedByJobId(), this.dataflowClient, this.aggregatorTransforms);
                }
                return execute;
            } catch (IOException e) {
                LOG.warn("There were problems getting current job status: {}.", e.getMessage());
                LOG.debug("Exception information:", (Throwable) e);
            }
        } while (nextBackOff(sleeper, attemptBoundedExponentialBackOff));
        throw e;
    }

    private boolean nextBackOff(Sleeper sleeper, BackOff backOff) {
        try {
            return BackOffUtils.next(sleeper, backOff);
        } catch (IOException | InterruptedException e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // com.google.cloud.dataflow.sdk.PipelineResult
    public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator) throws AggregatorRetrievalException {
        try {
            return new MapAggregatorValues(fromMetricUpdates(aggregator));
        } catch (IOException e) {
            String valueOf = String.valueOf(aggregator);
            throw new AggregatorRetrievalException(new StringBuilder(61 + String.valueOf(valueOf).length()).append("IOException when retrieving Aggregator values for Aggregator ").append(valueOf).toString(), e);
        }
    }

    private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator) throws IOException {
        List<MetricUpdate> metrics;
        if (!this.aggregatorTransforms.contains(aggregator)) {
            String valueOf = String.valueOf(aggregator);
            throw new IllegalArgumentException(new StringBuilder(40 + String.valueOf(valueOf).length()).append("Aggregator ").append(valueOf).append(" is not used in this pipeline").toString());
        }
        if (this.terminalMetricUpdates != null) {
            metrics = this.terminalMetricUpdates;
        } else {
            boolean isTerminal = getState().isTerminal();
            JobMetrics execute = this.dataflowClient.projects().jobs().getMetrics(this.projectId, this.jobId).execute();
            metrics = execute.getMetrics();
            if (isTerminal && execute.getMetrics() != null) {
                this.terminalMetricUpdates = metrics;
            }
        }
        return DataflowMetricUpdateExtractor.fromMetricUpdates(aggregator, this.aggregatorTransforms, metrics);
    }
}
