package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
import com.google.api.services.dataflow.model.ReportWorkItemStatusRequest;
import com.google.api.services.dataflow.model.ReportWorkItemStatusResponse;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.cloud.dataflow.sdk.options.DataflowWorkerHarnessOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingInitializer;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingMDC;
import com.google.cloud.dataflow.sdk.util.GcsIOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.IntervalBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
import com.google.cloud.dataflow.sdk.util.TimeUtil;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.common.worker.WorkProgressUpdater;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.Thread;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import javax.annotation.concurrent.ThreadSafe;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/DataflowWorkerHarness.class */
public class DataflowWorkerHarness {
    private static final Logger LOG = LoggerFactory.getLogger(DataflowWorkerHarness.class);
    private static final String APPLICATION_NAME = "DataflowWorkerHarness";
    static final int BACKOFF_INITIAL_INTERVAL_MILLIS = 5000;
    static final int BACKOFF_MAX_INTERVAL_MILLIS = 300000;

    /* JADX INFO: Access modifiers changed from: package-private */
    @ThreadSafe
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/DataflowWorkerHarness$DataflowWorkUnitClient.class */
    public static class DataflowWorkUnitClient extends DataflowWorker.WorkUnitClient {
        private static final ThreadLocal<DateTime> stageStartTime = new ThreadLocal<>();
        private final Dataflow dataflow;
        private final DataflowWorkerHarnessOptions options;

        static DataflowWorkUnitClient fromOptions(DataflowWorkerHarnessOptions dataflowWorkerHarnessOptions) {
            return new DataflowWorkUnitClient(Transport.newDataflowClient(dataflowWorkerHarnessOptions).build(), dataflowWorkerHarnessOptions);
        }

        DataflowWorkUnitClient(Dataflow dataflow, DataflowWorkerHarnessOptions dataflowWorkerHarnessOptions) {
            this.dataflow = dataflow;
            this.options = dataflowWorkerHarnessOptions;
        }

        @Override // com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.WorkUnitClient
        public WorkItem getWorkItem() throws IOException {
            LeaseWorkItemRequest leaseWorkItemRequest = new LeaseWorkItemRequest();
            leaseWorkItemRequest.setFactory(Transport.getJsonFactory());
            leaseWorkItemRequest.setWorkItemTypes(ImmutableList.of("map_task", "seq_map_task", "remote_source_task"));
            leaseWorkItemRequest.setWorkerCapabilities(ImmutableList.of(this.options.getWorkerId(), "remote_source", PropertyNames.CUSTOM_SOURCE_FORMAT));
            leaseWorkItemRequest.setWorkerId(this.options.getWorkerId());
            leaseWorkItemRequest.setCurrentWorkerTime(TimeUtil.toCloudTime(DateTime.now()));
            leaseWorkItemRequest.setRequestedLeaseDuration(TimeUtil.toCloudDuration(Duration.millis(WorkProgressUpdater.DEFAULT_LEASE_DURATION_MILLIS)));
            DataflowWorkerHarness.LOG.debug("Leasing work: {}", leaseWorkItemRequest);
            LeaseWorkItemResponse execute = this.dataflow.projects().jobs().workItems().lease(this.options.getProject(), this.options.getJobId(), leaseWorkItemRequest).execute();
            DataflowWorkerHarness.LOG.debug("Lease work response: {}", execute);
            List<WorkItem> workItems = execute.getWorkItems();
            if (workItems == null || workItems.isEmpty()) {
                return null;
            }
            if (workItems.size() > 1) {
                String valueOf = String.valueOf(execute);
                throw new IOException(new StringBuilder(77 + String.valueOf(valueOf).length()).append("This version of the SDK expects no more than one work item from the service: ").append(valueOf).toString());
            }
            WorkItem workItem = execute.getWorkItems().get(0);
            if (workItem == null || workItem.getId() == null) {
                return null;
            }
            if (workItem.getMapTask() != null) {
                String stageName = workItem.getMapTask().getStageName();
                DataflowWorkerLoggingMDC.setStageName(stageName);
                DataflowWorkerHarness.LOG.info("Starting MapTask stage {}", stageName);
            } else if (workItem.getSeqMapTask() != null) {
                String stageName2 = workItem.getSeqMapTask().getStageName();
                DataflowWorkerLoggingMDC.setStageName(stageName2);
                DataflowWorkerHarness.LOG.info("Starting SeqMapTask stage {}", stageName2);
            } else {
                DataflowWorkerLoggingMDC.setStageName(null);
            }
            stageStartTime.set(DateTime.now());
            DataflowWorkerLoggingMDC.setWorkId(Long.toString(workItem.getId().longValue()));
            return workItem;
        }

        @Override // com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.WorkUnitClient
        public WorkItemServiceState reportWorkItemStatus(WorkItemStatus workItemStatus) throws IOException {
            DateTime dateTime;
            DateTime now = DateTime.now();
            workItemStatus.setFactory(Transport.getJsonFactory());
            DataflowWorkerHarness.LOG.debug("Reporting work status: {}", workItemStatus);
            if (workItemStatus.getCompleted().booleanValue() && DataflowWorkerLoggingMDC.getStageName() != null && (dateTime = stageStartTime.get()) != null) {
                DataflowWorkerHarness.LOG.info("Finished processing stage {} with {} errors in {} seconds ", DataflowWorkerLoggingMDC.getStageName(), Integer.valueOf(workItemStatus.getErrors() == null ? 0 : workItemStatus.getErrors().size()), Double.valueOf(new Interval(dateTime, now).toDurationMillis() / 1000.0d));
            }
            ReportWorkItemStatusResponse execute = this.dataflow.projects().jobs().workItems().reportStatus(this.options.getProject(), this.options.getJobId(), new ReportWorkItemStatusRequest().setWorkerId(this.options.getWorkerId()).setWorkItemStatuses(Collections.singletonList(workItemStatus)).setCurrentWorkerTime(TimeUtil.toCloudTime(now))).execute();
            if (execute == null || execute.getWorkItemServiceStates() == null || execute.getWorkItemServiceStates().size() != 1) {
                throw new IOException("This version of the SDK expects exactly one work item service state from the service");
            }
            WorkItemServiceState workItemServiceState = execute.getWorkItemServiceStates().get(0);
            DataflowWorkerHarness.LOG.debug("ReportWorkItemStatus result: {}", workItemServiceState);
            return workItemServiceState;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/DataflowWorkerHarness$WorkerThread.class */
    public static class WorkerThread implements Callable<Boolean> {
        private final DataflowWorker worker;
        private final Sleeper sleeper;
        private final BackOff backOff = DataflowWorkerHarness.access$100();

        WorkerThread(DataflowWorker dataflowWorker, Sleeper sleeper) {
            this.worker = dataflowWorker;
            this.sleeper = sleeper;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() {
            while (true) {
                try {
                    boolean doWork = doWork();
                    if (doWork) {
                        this.backOff.reset();
                    }
                    if (!doWork && !BackOffUtils.next(this.sleeper, this.backOff)) {
                        break;
                    }
                } catch (IOException e) {
                    DataflowWorkerHarness.LOG.error("Already tried several attempts at working on tasks. Aborting.", (Throwable) e);
                } catch (InterruptedException e2) {
                    DataflowWorkerHarness.LOG.error("Interrupted during thread execution or sleep.", (Throwable) e2);
                } catch (Throwable th) {
                    DataflowWorkerHarness.LOG.error("Thread {} died.", Long.valueOf(Thread.currentThread().getId()), th);
                }
            }
            return false;
        }

        private boolean doWork() {
            try {
                DataflowWorkerHarness.LOG.debug("Thread starting getAndPerformWork.");
                boolean andPerformWork = this.worker.getAndPerformWork();
                DataflowWorkerHarness.LOG.debug("{} processing one WorkItem.", andPerformWork ? "Finished" : "Failed");
                return andPerformWork;
            } catch (IOException e) {
                DataflowWorkerHarness.LOG.debug("There was a problem getting work.", (Throwable) e);
                return false;
            } catch (Exception e2) {
                DataflowWorkerHarness.LOG.error("There was an unhandled error caused by the Dataflow SDK.", (Throwable) e2);
                return false;
            }
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/DataflowWorkerHarness$WorkerUncaughtExceptionHandler.class */
    static class WorkerUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        static final WorkerUncaughtExceptionHandler INSTANCE = new WorkerUncaughtExceptionHandler();
        private static final PrintStream originalStandardError = System.err;

        WorkerUncaughtExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            try {
                try {
                    DataflowWorkerHarness.LOG.error("Uncaught exception in main thread. Exiting with status code 1.", th);
                    System.err.println("Uncaught exception in main thread. Exiting with status code 1.");
                    th.printStackTrace();
                    System.exit(1);
                } catch (Throwable th2) {
                    originalStandardError.println("Uncaught exception in main thread. Exiting with status code 1.");
                    th.printStackTrace(originalStandardError);
                    originalStandardError.println("UncaughtExceptionHandler caused another exception to be thrown, as follows:");
                    th2.printStackTrace(originalStandardError);
                    System.exit(1);
                }
            } catch (Throwable th3) {
                System.exit(1);
                throw th3;
            }
        }
    }

    private static BackOff createBackOff() {
        return new IntervalBoundedExponentialBackOff(BACKOFF_MAX_INTERVAL_MILLIS, 5000L);
    }

    public static void main(String[] strArr) throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(WorkerUncaughtExceptionHandler.INSTANCE);
        DataflowWorkerLoggingInitializer.initialize();
        DataflowWorkerHarnessOptions createFromSystemPropertiesInternal = PipelineOptionsFactory.createFromSystemPropertiesInternal();
        DataflowWorkerLoggingInitializer.configure(createFromSystemPropertiesInternal);
        Sleeper sleeper = Sleeper.DEFAULT;
        DataflowWorker create = create(createFromSystemPropertiesInternal);
        int i = 18081;
        if (System.getProperties().containsKey("status_port")) {
            i = Integer.parseInt(System.getProperty("status_port"));
        }
        create.runStatusServer(i);
        processWork(createFromSystemPropertiesInternal, create, sleeper);
    }

    static void processWork(DataflowWorkerHarnessOptions dataflowWorkerHarnessOptions, DataflowWorker dataflowWorker, Sleeper sleeper) throws InterruptedException {
        int chooseNumberOfThreads = chooseNumberOfThreads(dataflowWorkerHarnessOptions);
        ExecutorService executorService = dataflowWorkerHarnessOptions.getExecutorService();
        LinkedList linkedList = new LinkedList();
        LOG.debug("Starting {} worker threads", Integer.valueOf(chooseNumberOfThreads));
        for (int i = 0; i < chooseNumberOfThreads; i++) {
            linkedList.add(new WorkerThread(dataflowWorker, sleeper));
        }
        LOG.debug("Waiting for {} worker threads", Integer.valueOf(chooseNumberOfThreads));
        executorService.invokeAll(linkedList);
        LOG.error("All threads died.");
    }

    static DataflowWorker create(DataflowWorkerHarnessOptions dataflowWorkerHarnessOptions) {
        DataflowWorkerLoggingMDC.setJobId(dataflowWorkerHarnessOptions.getJobId());
        DataflowWorkerLoggingMDC.setWorkerId(dataflowWorkerHarnessOptions.getWorkerId());
        dataflowWorkerHarnessOptions.setAppName(APPLICATION_NAME);
        IOChannelUtils.setIOFactory("gs", new GcsIOChannelFactory(dataflowWorkerHarnessOptions));
        return new DataflowWorker(DataflowWorkUnitClient.fromOptions(dataflowWorkerHarnessOptions), dataflowWorkerHarnessOptions);
    }

    private static int chooseNumberOfThreads(DataflowWorkerHarnessOptions dataflowWorkerHarnessOptions) {
        return dataflowWorkerHarnessOptions.getNumberOfWorkerHarnessThreads() != 0 ? dataflowWorkerHarnessOptions.getNumberOfWorkerHarnessThreads() : Math.max(Runtime.getRuntime().availableProcessors(), 1);
    }

    static /* synthetic */ BackOff access$100() {
        return createBackOff();
    }
}
