package com.google.cloud.dataflow.sdk.runners.worker;

import com.beust.jcommander.Parameters;
import com.google.api.services.dataflow.model.MapTask;
import com.google.api.services.dataflow.model.ParallelInstruction;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.DataflowWorkerHarnessOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.runners.dataflow.CustomSources;
import com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness;
import com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory;
import com.google.cloud.dataflow.sdk.runners.worker.UserCodeTimeTracker;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingInitializer;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingMDC;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.WindmillServerStub;
import com.google.cloud.dataflow.sdk.util.BoundedQueueExecutor;
import com.google.cloud.dataflow.sdk.util.MemoryMonitor;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
import com.google.cloud.dataflow.sdk.util.Serializer;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor;
import com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter;
import com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver;
import com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker.class */
public class StreamingDataflowWorker {
    static final int MAX_PROCESSING_THREADS = 300;
    static final long THREAD_EXPIRATION_TIME_SEC = 60;
    static final int MAX_WORK_UNITS_QUEUED = 100;
    static final long MAX_COMMIT_BYTES = 33554432;
    static final int DEFAULT_STATUS_PORT = 8081;
    static final String DEFAULT_WINDMILL_SERVER_CLASS_NAME = "com.google.cloud.dataflow.sdk.runners.worker.windmill.WindmillServer";
    static final int MAX_COMMIT_QUEUE_BYTES = 524288000;
    private static final long MAX_GET_WORK_FETCH_BYTES = 67108864;
    private static final long MAX_GET_WORK_ITEMS = 100;
    private BoundedQueueExecutor workUnitExecutor;
    private WindmillServerStub windmillServer;
    private Thread dispatchThread;
    private StateFetcher stateFetcher;
    private DataflowWorkerHarnessOptions options;
    private Server statusServer;
    private final MetricTrackingWindmillServerStub metricTrackingWindmillServer;
    private Timer globalCountersUpdatesTimer;
    private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class);
    private static final MemoryMonitor memoryMonitor = new MemoryMonitor();
    private final KeyedWeightBoundedQueue<String, Windmill.WorkItemCommitRequest> commitQueue = new KeyedWeightBoundedQueue<>(MAX_COMMIT_QUEUE_BYTES, new Function<Windmill.WorkItemCommitRequest, Integer>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.1
        @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function
        public Integer apply(Windmill.WorkItemCommitRequest workItemCommitRequest) {
            return Integer.valueOf(workItemCommitRequest.getSerializedSize());
        }
    });
    private WindmillStateCache stateCache = new WindmillStateCache();
    private final UserCodeTimeTracker userCodeTimeTracker = new UserCodeTimeTracker();
    private final AtomicInteger nextStateSamplerId = new AtomicInteger();
    private final ConcurrentMap<String, MapTask> instructionMap = new ConcurrentHashMap();
    private final ConcurrentMap<String, ConcurrentLinkedQueue<WorkerAndContext>> mapTaskExecutors = new ConcurrentHashMap();
    private final ConcurrentMap<String, ActiveWorkForComputation> activeWorkMap = new ConcurrentHashMap();
    private final ConcurrentMap<String, ConcurrentMap<ByteString, ReaderCacheEntry>> readerCache = new ConcurrentHashMap();
    private ConcurrentMap<Long, Runnable> commitCallbacks = new ConcurrentHashMap();
    private ConcurrentMap<String, String> stateNameMap = new ConcurrentHashMap();
    private ConcurrentMap<String, String> systemNameToComputationIdMap = new ConcurrentHashMap();
    private ThreadFactory threadFactory = new ThreadFactory() { // from class: com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.2
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            return thread;
        }
    };
    private ExecutorService commitExecutor = new ThreadPoolExecutor(1, 1, Long.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue(2), new ThreadFactory() { // from class: com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.3
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setPriority(10);
            thread.setName("CommitThread");
            return thread;
        }
    }, new ThreadPoolExecutor.DiscardPolicy());
    private AtomicBoolean running = new AtomicBoolean();
    private long clientId = new Random().nextLong();
    private final AtomicReference<Throwable> lastException = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker$ActiveWorkForComputation.class */
    public static class ActiveWorkForComputation {
        private Map<ByteString, Queue<Work>> activeWork = new HashMap();
        private BoundedQueueExecutor executor;

        ActiveWorkForComputation(BoundedQueueExecutor boundedQueueExecutor) {
            this.executor = boundedQueueExecutor;
        }

        public synchronized boolean activateWork(ByteString byteString, Work work) {
            Queue<Work> queue = this.activeWork.get(byteString);
            if (queue == null) {
                LinkedList linkedList = new LinkedList();
                this.activeWork.put(byteString, linkedList);
                linkedList.add(work);
                return true;
            }
            if (queue.peek().getWorkToken() == work.getWorkToken()) {
                return false;
            }
            queue.add(work);
            return false;
        }

        public synchronized void completeWork(ByteString byteString) {
            Queue<Work> queue = this.activeWork.get(byteString);
            queue.poll();
            if (queue.peek() != null) {
                this.executor.forceExecute(queue.peek());
            } else {
                this.activeWork.remove(byteString);
            }
        }

        public synchronized void printActiveWork(PrintWriter printWriter) {
            printWriter.println("<ul>");
            for (Map.Entry<ByteString, Queue<Work>> entry : this.activeWork.entrySet()) {
                Queue<Work> value = entry.getValue();
                printWriter.print("<li>Key: ");
                printWriter.print(entry.getKey().toStringUtf8());
                printWriter.print(" Token: ");
                printWriter.print(value.peek().getWorkToken());
                if (value.size() > 1) {
                    printWriter.print("(");
                    printWriter.print(value.size() - 1);
                    printWriter.print(" queued)");
                }
                printWriter.println("</li>");
            }
            printWriter.println("</ul>");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker$Commit.class */
    public class Commit implements Runnable {
        private Commit() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Windmill.WorkItemCommitRequest workItemCommitRequest;
            while (true) {
                Windmill.CommitWorkRequest.Builder newBuilder = Windmill.CommitWorkRequest.newBuilder();
                long j = 33554432;
                for (String str : StreamingDataflowWorker.this.commitQueue.keySet()) {
                    Windmill.ComputationCommitWorkRequest.Builder newBuilder2 = Windmill.ComputationCommitWorkRequest.newBuilder();
                    while (j > 0 && (workItemCommitRequest = (Windmill.WorkItemCommitRequest) StreamingDataflowWorker.this.commitQueue.poll(str)) != null) {
                        j -= workItemCommitRequest.getSerializedSize();
                        newBuilder2.addRequests(workItemCommitRequest);
                    }
                    if (newBuilder2.getRequestsCount() > 0) {
                        newBuilder2.setComputationId(str);
                        newBuilder.addRequests(newBuilder2);
                    }
                }
                if (newBuilder.getRequestsCount() <= 0) {
                    return;
                }
                Windmill.CommitWorkRequest build = newBuilder.build();
                StreamingDataflowWorker.LOG.trace("Commit: {}", build);
                StreamingDataflowWorker.this.commitWork(build);
                for (Windmill.ComputationCommitWorkRequest computationCommitWorkRequest : build.getRequestsList()) {
                    ActiveWorkForComputation activeWorkForComputation = (ActiveWorkForComputation) StreamingDataflowWorker.this.activeWorkMap.get(computationCommitWorkRequest.getComputationId());
                    Iterator<Windmill.WorkItemCommitRequest> it = computationCommitWorkRequest.getRequestsList().iterator();
                    while (it.hasNext()) {
                        activeWorkForComputation.completeWork(it.next().getKey());
                    }
                }
            }
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker$KeyTokenInvalidException.class */
    public static class KeyTokenInvalidException extends RuntimeException {
        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public KeyTokenInvalidException(java.lang.String r6) {
            /*
                r5 = this;
                r0 = r5
                java.lang.String r1 = "Unable to fetch data due to token mismatch for key "
                r2 = r6
                java.lang.String r2 = java.lang.String.valueOf(r2)
                r3 = r2
                int r3 = r3.length()
                if (r3 == 0) goto L14
                java.lang.String r1 = r1.concat(r2)
                goto L1d
            L14:
                java.lang.String r2 = new java.lang.String
                r3 = r2; r2 = r1; r1 = r3; 
                r4 = r2; r2 = r3; r3 = r4; 
                r2.<init>(r3)
            L1d:
                r0.<init>(r1)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.KeyTokenInvalidException.<init>(java.lang.String):void");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker$KeyedWeightBoundedQueue.class */
    public static class KeyedWeightBoundedQueue<K, V> {
        private final ConcurrentMap<K, ConcurrentLinkedQueue<V>> queueMap = new ConcurrentHashMap();
        private final int maxWeight;
        private final Semaphore limit;
        private final Function<V, Integer> weigher;

        public KeyedWeightBoundedQueue(int i, Function<V, Integer> function) {
            this.maxWeight = i;
            this.limit = new Semaphore(i, true);
            this.weigher = function;
        }

        public void addQueue(K k) {
            this.queueMap.put(k, new ConcurrentLinkedQueue<>());
        }

        public void put(K k, V v) {
            this.limit.acquireUninterruptibly(Math.max(this.maxWeight, this.weigher.apply(v).intValue()));
            ((ConcurrentLinkedQueue) Preconditions.checkNotNull(this.queueMap.get(k), "Must create a queue by calling addQueue() before put. Missing key %s", k)).add(v);
        }

        public Set<K> keySet() {
            return this.queueMap.keySet();
        }

        @Nullable
        public V poll(K k) {
            V poll = this.queueMap.get(k).poll();
            if (poll != null) {
                this.limit.release(Math.max(this.maxWeight, this.weigher.apply(poll).intValue()));
            }
            return poll;
        }

        public int queueSize(K k) {
            return this.queueMap.get(k).size();
        }

        public int weight() {
            return this.maxWeight - this.limit.availablePermits();
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker$ReaderCacheEntry.class */
    public static class ReaderCacheEntry {
        UnboundedSource.UnboundedReader<?> reader;
        long token;

        public ReaderCacheEntry(UnboundedSource.UnboundedReader<?> unboundedReader, long j) {
            this.reader = unboundedReader;
            this.token = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker$StatusHandler.class */
    public class StatusHandler extends AbstractHandler {
        private StatusHandler() {
        }

        @Override // org.eclipse.jetty.server.Handler
        public void handle(String str, Request request, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException {
            httpServletResponse.setContentType("text/html;charset=utf-8");
            httpServletResponse.setStatus(200);
            request.setHandled(true);
            PrintWriter writer = httpServletResponse.getWriter();
            writer.println("<html><body>");
            if (str.equals("/healthz")) {
                writer.println("ok");
            } else if (str.equals("/threadz")) {
                StreamingDataflowWorker.this.printThreads(writer);
            } else if (str.equals("/heapz")) {
                StreamingDataflowWorker.this.dumpHeap(writer);
            } else if (str.equals("/cachez")) {
                StreamingDataflowWorker.this.stateCache.printDetailedHtml(writer);
            } else {
                StreamingDataflowWorker.this.printHeader(writer);
                StreamingDataflowWorker.this.printResources(writer);
                StreamingDataflowWorker.this.printMetrics(writer);
                StreamingDataflowWorker.this.printLastException(writer);
                StreamingDataflowWorker.this.printSpecs(writer);
            }
            writer.println("</body></html>");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker$Work.class */
    public static abstract class Work implements Runnable {
        private final long workToken;

        public Work(long j) {
            this.workToken = j;
        }

        public long getWorkToken() {
            return this.workToken;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker$WorkerAndContext.class */
    public static class WorkerAndContext {
        public MapTaskExecutor worker;
        public StreamingModeExecutionContext context;

        public WorkerAndContext(MapTaskExecutor mapTaskExecutor, StreamingModeExecutionContext streamingModeExecutionContext) {
            this.worker = mapTaskExecutor;
            this.context = streamingModeExecutionContext;
        }

        public MapTaskExecutor getWorker() {
            return this.worker;
        }

        public StreamingModeExecutionContext getContext() {
            return this.context;
        }
    }

    public static boolean isKeyTokenInvalidException(Throwable th) {
        while (th != null) {
            if (th instanceof KeyTokenInvalidException) {
                return true;
            }
            th = th.getCause();
        }
        return false;
    }

    private static boolean isOutOfMemoryError(Throwable th) {
        while (th != null) {
            if (th instanceof OutOfMemoryError) {
                return true;
            }
            th = th.getCause();
        }
        return false;
    }

    static MapTask parseMapTask(String str) throws IOException {
        return (MapTask) Transport.getJsonFactory().fromString(str, MapTask.class);
    }

    public static void main(String[] strArr) throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(DataflowWorkerHarness.WorkerUncaughtExceptionHandler.INSTANCE);
        new Thread(memoryMonitor).start();
        DataflowWorkerLoggingInitializer.initialize();
        DataflowWorkerHarnessOptions createFromSystemPropertiesInternal = PipelineOptionsFactory.createFromSystemPropertiesInternal();
        createFromSystemPropertiesInternal.setAppName("StreamingWorkerHarness");
        createFromSystemPropertiesInternal.setStreaming(true);
        DataflowWorkerLoggingInitializer.configure(createFromSystemPropertiesInternal);
        String property = System.getProperty("windmill.hostport");
        if (property == null) {
            throw new Exception("-Dwindmill.hostport must be set to the location of the windmill server");
        }
        int i = DEFAULT_STATUS_PORT;
        if (System.getProperties().containsKey("status_port")) {
            i = Integer.parseInt(System.getProperty("status_port"));
        }
        String str = DEFAULT_WINDMILL_SERVER_CLASS_NAME;
        if (System.getProperties().containsKey("windmill.serverclassname")) {
            str = System.getProperty("windmill.serverclassname");
        }
        ArrayList arrayList = new ArrayList();
        for (String str2 : strArr) {
            arrayList.add(parseMapTask(str2));
        }
        StreamingDataflowWorker streamingDataflowWorker = new StreamingDataflowWorker(arrayList, (WindmillServerStub) Class.forName(str).getDeclaredConstructor(String.class).newInstance(property), createFromSystemPropertiesInternal);
        streamingDataflowWorker.start();
        streamingDataflowWorker.runStatusServer(i);
    }

    public StreamingDataflowWorker(List<MapTask> list, WindmillServerStub windmillServerStub, DataflowWorkerHarnessOptions dataflowWorkerHarnessOptions) {
        this.options = dataflowWorkerHarnessOptions;
        this.workUnitExecutor = new BoundedQueueExecutor(chooseMaximumNumberOfThreads(dataflowWorkerHarnessOptions), THREAD_EXPIRATION_TIME_SEC, TimeUnit.SECONDS, 100, this.threadFactory);
        this.windmillServer = windmillServerStub;
        this.metricTrackingWindmillServer = new MetricTrackingWindmillServerStub(windmillServerStub, memoryMonitor);
        this.stateFetcher = new StateFetcher(this.metricTrackingWindmillServer);
        Iterator<MapTask> it = list.iterator();
        while (it.hasNext()) {
            addComputation(it.next());
        }
        DataflowWorkerLoggingMDC.setJobId(dataflowWorkerHarnessOptions.getJobId());
        DataflowWorkerLoggingMDC.setWorkerId(dataflowWorkerHarnessOptions.getWorkerId());
    }

    private static int chooseMaximumNumberOfThreads(DataflowWorkerHarnessOptions dataflowWorkerHarnessOptions) {
        if (dataflowWorkerHarnessOptions.getNumberOfWorkerHarnessThreads() != 0) {
            return dataflowWorkerHarnessOptions.getNumberOfWorkerHarnessThreads();
        }
        return 300;
    }

    void addStateNameMappings(Map<String, String> map) {
        this.stateNameMap.putAll(map);
    }

    public void start() {
        this.running.set(true);
        this.dispatchThread = this.threadFactory.newThread(new Runnable() { // from class: com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.4
            @Override // java.lang.Runnable
            public void run() {
                StreamingDataflowWorker.this.dispatchLoop();
            }
        });
        this.dispatchThread.setPriority(1);
        this.dispatchThread.setName("DispatchThread");
        this.dispatchThread.start();
        this.globalCountersUpdatesTimer = new Timer("GlobalCountersUpdates");
        this.globalCountersUpdatesTimer.schedule(new TimerTask() { // from class: com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.5
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                StreamingDataflowWorker.this.reportPeriodicStats();
            }
        }, 1000L, 1000L);
        reportHarnessStartup();
    }

    public void stop() {
        try {
            if (this.globalCountersUpdatesTimer != null) {
                this.globalCountersUpdatesTimer.cancel();
            }
            if (this.statusServer != null) {
                this.statusServer.stop();
            }
            this.running.set(false);
            this.dispatchThread.join();
            this.workUnitExecutor.shutdown();
            if (!this.workUnitExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
                throw new RuntimeException("Work executor did not terminate within 5 minutes");
            }
            for (ConcurrentLinkedQueue<WorkerAndContext> concurrentLinkedQueue : this.mapTaskExecutors.values()) {
                while (true) {
                    WorkerAndContext poll = concurrentLinkedQueue.poll();
                    if (poll != null) {
                        poll.getWorker().close();
                    }
                }
            }
            this.commitExecutor.shutdown();
            if (!this.commitExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
                throw new RuntimeException("Commit executor did not terminate within 5 minutes");
            }
        } catch (Exception e) {
            LOG.warn("Exception while shutting down: ", (Throwable) e);
        }
    }

    public void runStatusServer(int i) {
        this.statusServer = new Server(i);
        this.statusServer.setHandler(new StatusHandler());
        try {
            this.statusServer.start();
            LOG.info("Status server started on port {}", Integer.valueOf(i));
            this.statusServer.join();
        } catch (Exception e) {
            LOG.warn("Status server failed to start: ", (Throwable) e);
        }
    }

    private void addComputation(MapTask mapTask) {
        String systemName = this.systemNameToComputationIdMap.containsKey(mapTask.getSystemName()) ? this.systemNameToComputationIdMap.get(mapTask.getSystemName()) : mapTask.getSystemName();
        if (this.instructionMap.containsKey(systemName)) {
            return;
        }
        LOG.info("Adding config for {}: {}", systemName, mapTask);
        this.commitQueue.addQueue(systemName);
        this.instructionMap.put(systemName, mapTask);
        this.mapTaskExecutors.put(systemName, new ConcurrentLinkedQueue<>());
        this.activeWorkMap.put(systemName, new ActiveWorkForComputation(this.workUnitExecutor));
        this.readerCache.put(systemName, new ConcurrentHashMap());
    }

    private static void sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatchLoop() {
        Windmill.GetWorkResponse work;
        LOG.info("Dispatch starting");
        while (this.running.get()) {
            memoryMonitor.waitForResources("GetWork");
            int i = 1;
            do {
                work = getWork();
                if (work.getWorkCount() > 0) {
                    break;
                }
                sleep(i);
                i = Math.min(1000, i * 2);
            } while (this.running.get());
            for (Windmill.ComputationWorkItems computationWorkItems : work.getWorkList()) {
                final String computationId = computationWorkItems.getComputationId();
                if (!this.instructionMap.containsKey(computationId)) {
                    getConfig(computationId);
                }
                final MapTask mapTask = this.instructionMap.get(computationId);
                if (mapTask == null) {
                    LOG.warn("Received work for unknown computation: {}. Known computations are {}", computationId, this.instructionMap.keySet());
                } else {
                    final Instant windmillToHarnessInputWatermark = WindmillTimeUtils.windmillToHarnessInputWatermark(computationWorkItems.getInputDataWatermark());
                    ActiveWorkForComputation activeWorkForComputation = this.activeWorkMap.get(computationId);
                    for (final Windmill.WorkItem workItem : computationWorkItems.getWorkList()) {
                        final Instant windmillToHarnessOutputWatermark = WindmillTimeUtils.windmillToHarnessOutputWatermark(workItem.getOutputDataWatermark());
                        Preconditions.checkState(windmillToHarnessInputWatermark == null || windmillToHarnessOutputWatermark == null || !windmillToHarnessOutputWatermark.isAfter(windmillToHarnessInputWatermark));
                        Work work2 = new Work(workItem.getWorkToken()) { // from class: com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.6
                            @Override // java.lang.Runnable
                            public void run() {
                                StreamingDataflowWorker.this.process(computationId, mapTask, windmillToHarnessInputWatermark, windmillToHarnessOutputWatermark, workItem);
                            }
                        };
                        if (activeWorkForComputation.activateWork(workItem.getKey(), work2)) {
                            this.workUnitExecutor.execute(work2);
                        }
                    }
                }
            }
        }
        LOG.info("Dispatch done");
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Finally extract failed */
    public void process(final String str, final MapTask mapTask, @Nullable final Instant instant, @Nullable final Instant instant2, final Windmill.WorkItem workItem) {
        MapTaskExecutor worker;
        StreamingModeExecutionContext context;
        String str2;
        LOG.debug("Starting processing for {}:\n{}", str, workItem);
        Windmill.WorkItemCommitRequest.Builder workToken = Windmill.WorkItemCommitRequest.newBuilder().setKey(workItem.getKey()).setWorkToken(workItem.getWorkToken());
        AutoCloseable autoCloseable = null;
        try {
            try {
                String valueOf = String.valueOf(workItem.getKey().toStringUtf8());
                String valueOf2 = String.valueOf(Long.toString(workItem.getWorkToken()));
                DataflowWorkerLoggingMDC.setWorkId(new StringBuilder(1 + String.valueOf(valueOf).length() + String.valueOf(valueOf2).length()).append(valueOf).append(Parameters.DEFAULT_OPTION_PREFIXES).append(valueOf2).toString());
                DataflowWorkerLoggingMDC.setStageName(str);
                WorkerAndContext poll = this.mapTaskExecutors.get(str).poll();
                if (poll == null) {
                    CounterSet counterSet = new CounterSet(new Counter[0]);
                    context = new StreamingModeExecutionContext(mapTask.getSystemName(), this.readerCache.get(str), this.stateNameMap, this.stateCache.forComputation(str));
                    StateSampler stateSampler = new StateSampler(String.valueOf(mapTask.getStageName()).concat(Parameters.DEFAULT_OPTION_PREFIXES), counterSet.getAddCounterMutator());
                    int incrementAndGet = this.nextStateSamplerId.incrementAndGet();
                    stateSampler.addSamplingCallback(new UserCodeTimeTracker.StateSamplerCallback(this.userCodeTimeTracker, incrementAndGet));
                    this.userCodeTimeTracker.workStarted(stateSampler.getPrefix(), incrementAndGet, counterSet.getAddCounterMutator());
                    worker = MapTaskExecutorFactory.create(this.options, mapTask, context, counterSet, stateSampler);
                    ReadOperation readOperation = worker.getReadOperation();
                    readOperation.setProgressUpdatePeriodMs(-1L);
                    Preconditions.checkState(worker.supportsRestart(), "Streaming runner requires all operations support restart.");
                    ParallelInstruction parallelInstruction = mapTask.getInstructions().get(0);
                    if (CustomSources.class.getName().equals(parallelInstruction.getRead().getSource().getSpec().get(PropertyNames.OBJECT_TYPE_NAME))) {
                        Coder coder = (Coder) Serializer.deserialize(parallelInstruction.getOutputs().get(0).getCodec(), Coder.class);
                        OutputReceiver outputReceiver = readOperation.receivers[0];
                        OutputObjectAndByteCounter samplingPeriod = new OutputObjectAndByteCounter(new MapTaskExecutorFactory.ElementByteSizeObservableCoder(coder), worker.getOutputCounters().getAddCounterMutator()).setSamplingPeriod(100);
                        String valueOf3 = String.valueOf(mapTask.getSystemName());
                        if (valueOf3.length() != 0) {
                            str2 = "dataflow_input_size-".concat(valueOf3);
                        } else {
                            str2 = r3;
                            String str3 = new String("dataflow_input_size-");
                        }
                        outputReceiver.addOutputCounter(samplingPeriod.countBytes(str2));
                    }
                } else {
                    worker = poll.getWorker();
                    context = poll.getContext();
                }
                WindmillStateReader windmillStateReader = new WindmillStateReader(this.metricTrackingWindmillServer, str, workItem.getKey(), workItem.getWorkToken());
                StateFetcher byteTrackingView = this.stateFetcher.byteTrackingView();
                context.start(workItem, instant, instant2, windmillStateReader, byteTrackingView, workToken);
                Iterator<Long> it = context.getReadyCommitCallbackIds().iterator();
                while (it.hasNext()) {
                    final Runnable remove = this.commitCallbacks.remove(it.next());
                    if (remove != null) {
                        this.workUnitExecutor.forceExecute(new Runnable() { // from class: com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.7
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    remove.run();
                                } catch (Throwable th) {
                                    StreamingDataflowWorker.LOG.error("Source checkpoint finalization failed:", th);
                                }
                            }
                        });
                    }
                }
                worker.execute();
                this.commitCallbacks.putAll(context.flushState());
                long j = 0;
                Iterator<Windmill.InputMessageBundle> it2 = workItem.getMessageBundlesList().iterator();
                while (it2.hasNext()) {
                    while (it2.next().getMessagesList().iterator().hasNext()) {
                        j += r0.next().getSerializedSize();
                    }
                }
                long bytesRead = windmillStateReader.getBytesRead() + byteTrackingView.getBytesRead();
                long serializedSize = Windmill.WorkItemCommitRequest.newBuilder(workToken.build()).clearOutputMessages().build().getSerializedSize();
                CounterSet outputCounters = worker.getOutputCounters();
                outputCounters.getAddCounterMutator().addCounter(Counter.longs("WindmillShuffleBytesRead", Counter.AggregationKind.SUM)).addValue(Long.valueOf(j));
                outputCounters.getAddCounterMutator().addCounter(Counter.longs("WindmillStateBytesRead", Counter.AggregationKind.SUM)).addValue(Long.valueOf(bytesRead));
                outputCounters.getAddCounterMutator().addCounter(Counter.longs("WindmillStateBytesWritten", Counter.AggregationKind.SUM)).addValue(Long.valueOf(serializedSize));
                buildCounters(outputCounters, workToken);
                this.mapTaskExecutors.get(str).offer(new WorkerAndContext(worker, context));
                autoCloseable = null;
                this.commitQueue.put(str, workToken.build());
                scheduleCommit();
                LOG.debug("Processing done for work token: {}", Long.valueOf(workItem.getWorkToken()));
                DataflowWorkerLoggingMDC.setWorkId(null);
                DataflowWorkerLoggingMDC.setStageName(null);
            } catch (Throwable th) {
                try {
                    if (autoCloseable != null) {
                        try {
                            autoCloseable.close();
                        } catch (Exception e) {
                            LOG.warn("Failed to close worker: ", (Throwable) e);
                        }
                    }
                    Throwable cause = th instanceof UserCodeException ? th.getCause() : th;
                    if (isOutOfMemoryError(cause)) {
                        reportFailure(str, workItem, cause);
                        LOG.error("Received OutOfMemoryError, crashing.  Error was ", cause);
                        System.exit(1);
                    } else if (isKeyTokenInvalidException(cause)) {
                        LOG.debug("Execution of work for {} for key {} failed due to token expiration, will not retry locally.", str, workItem.getKey().toStringUtf8());
                        this.activeWorkMap.get(str).completeWork(workItem.getKey());
                    } else {
                        LOG.error("Execution of work for {} for key {} failed, retrying.", str, workItem.getKey().toStringUtf8());
                        LOG.error("\nError: ", cause);
                        this.lastException.set(cause);
                        LOG.debug("Failed work: {}", workItem);
                        if (reportFailure(str, workItem, cause)) {
                            sleep(10000);
                            this.workUnitExecutor.forceExecute(new Runnable() { // from class: com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.8
                                @Override // java.lang.Runnable
                                public void run() {
                                    StreamingDataflowWorker.this.process(str, mapTask, instant, instant2, workItem);
                                }
                            });
                        } else {
                            LOG.debug("Aborting processing due to exception reporting failure");
                            this.activeWorkMap.get(str).completeWork(workItem.getKey());
                        }
                    }
                    DataflowWorkerLoggingMDC.setWorkId(null);
                    DataflowWorkerLoggingMDC.setStageName(null);
                } catch (Throwable th2) {
                    throw th2;
                }
            }
        } catch (Throwable th3) {
            DataflowWorkerLoggingMDC.setWorkId(null);
            DataflowWorkerLoggingMDC.setStageName(null);
            throw th3;
        }
    }

    private void scheduleCommit() {
        this.commitExecutor.execute(new Commit());
    }

    private Windmill.GetWorkResponse getWork() {
        return this.windmillServer.getWork(Windmill.GetWorkRequest.newBuilder().setClientId(this.clientId).setMaxItems(100L).setMaxBytes(MAX_GET_WORK_FETCH_BYTES).build());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitWork(Windmill.CommitWorkRequest commitWorkRequest) {
        this.windmillServer.commitWork(commitWorkRequest);
    }

    private void getConfig(String str) {
        Windmill.GetConfigResponse config = this.windmillServer.getConfig(Windmill.GetConfigRequest.newBuilder().addComputations(str).build());
        for (Windmill.GetConfigResponse.SystemNameToComputationIdMapEntry systemNameToComputationIdMapEntry : config.getSystemNameToComputationIdMapList()) {
            this.systemNameToComputationIdMap.put(systemNameToComputationIdMapEntry.getSystemName(), systemNameToComputationIdMapEntry.getComputationId());
        }
        for (String str2 : config.getCloudWorksList()) {
            try {
                addComputation(parseMapTask(str2));
            } catch (IOException e) {
                LOG.warn("Parsing MapTask failed: {}", str2);
                LOG.warn("Error: ", (Throwable) e);
            }
        }
        for (Windmill.GetConfigResponse.NameMapEntry nameMapEntry : config.getNameMapList()) {
            this.stateNameMap.put(nameMapEntry.getUserName(), nameMapEntry.getSystemName());
        }
    }

    private void buildCounters(CounterSet counterSet, Windmill.WorkItemCommitRequest.Builder builder) {
        Windmill.Counter.Kind kind;
        Iterator<Counter<?>> it = counterSet.iterator();
        while (it.hasNext()) {
            Counter<?> next = it.next();
            Windmill.Counter.Builder newBuilder = Windmill.Counter.newBuilder();
            Object obj = null;
            switch (next.getKind()) {
                case SUM:
                    kind = Windmill.Counter.Kind.SUM;
                    break;
                case MAX:
                    kind = Windmill.Counter.Kind.MAX;
                    break;
                case MIN:
                    kind = Windmill.Counter.Kind.MIN;
                    break;
                case MEAN:
                    kind = Windmill.Counter.Kind.MEAN;
                    Counter.CounterMean<?> andResetMeanDelta = next.getAndResetMeanDelta();
                    long count = andResetMeanDelta.getCount();
                    obj = andResetMeanDelta.getAggregate();
                    if (count > 0) {
                        newBuilder.setMeanCount(count);
                        break;
                    } else {
                        break;
                    }
                default:
                    LOG.debug("Unhandled counter type: {}", next.getKind());
                    continue;
            }
            if (next.getKind() != Counter.AggregationKind.MEAN) {
                obj = next.getAndResetDelta();
            }
            if (addKnownTypeToCounterBuilder(obj, newBuilder)) {
                newBuilder.setName(next.getName()).setKind(kind);
                builder.addCounterUpdates(newBuilder);
            }
        }
    }

    private boolean addKnownTypeToCounterBuilder(Object obj, Windmill.Counter.Builder builder) {
        if (obj instanceof Double) {
            double doubleValue = ((Double) obj).doubleValue();
            if (doubleValue == CMAESOptimizer.DEFAULT_STOPFITNESS) {
                return true;
            }
            builder.setDoubleScalar(doubleValue);
            return true;
        }
        if (obj instanceof Long) {
            long longValue = ((Long) obj).longValue();
            if (longValue == 0) {
                return true;
            }
            builder.setIntScalar(longValue);
            return true;
        }
        if (!(obj instanceof Integer)) {
            LOG.debug("Unhandled aggregate class: {}", obj.getClass());
            return false;
        }
        long longValue2 = ((Integer) obj).longValue();
        if (longValue2 == 0) {
            return true;
        }
        builder.setIntScalar(longValue2);
        return true;
    }

    private Windmill.Exception buildExceptionReport(Throwable th) {
        Windmill.Exception.Builder newBuilder = Windmill.Exception.newBuilder();
        newBuilder.addStackFrames(th.toString());
        for (StackTraceElement stackTraceElement : th.getStackTrace()) {
            newBuilder.addStackFrames(stackTraceElement.toString());
        }
        if (th.getCause() != null) {
            newBuilder.setCause(buildExceptionReport(th.getCause()));
        }
        return newBuilder.build();
    }

    private boolean reportFailure(String str, Windmill.WorkItem workItem, Throwable th) {
        return !this.windmillServer.reportStats(Windmill.ReportStatsRequest.newBuilder().setComputationId(str).setKey(workItem.getKey()).setWorkToken(workItem.getWorkToken()).addExceptions(buildExceptionReport(th)).build()).getFailed();
    }

    private void reportHarnessStartup() {
        if (this.windmillServer.reportStats(Windmill.ReportStatsRequest.newBuilder().addCounterUpdates(Windmill.Counter.newBuilder().setName("dataflow_java_harness_restarts").setKind(Windmill.Counter.Kind.SUM).setIntScalar(1L)).build()).getFailed()) {
            LOG.warn("Failed to notify windmill on harness startup. dataflow_java_harness_restarts will  not be incremented.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reportPeriodicStats() {
        Runtime runtime = Runtime.getRuntime();
        long freeMemory = runtime.totalMemory() - runtime.freeMemory();
        if (this.windmillServer.reportStats(Windmill.ReportStatsRequest.newBuilder().addCounterUpdates(Windmill.Counter.newBuilder().setName("dataflow_java_harness_memory_utilization").setKind(Windmill.Counter.Kind.MEAN).setCumulative(true).setIntScalar(freeMemory).setMeanCount(runtime.maxMemory())).build()).getFailed()) {
            LOG.warn("Failed to send periodic counters to windmill.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void printHeader(PrintWriter printWriter) {
        printWriter.println("<h1>Streaming Worker Harness</h1>");
        printWriter.println(new StringBuilder(18).append("Running: ").append(this.running.get()).append("<br>").toString());
        printWriter.println(new StringBuilder(28).append("ID: ").append(this.clientId).append("<br>").toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void printMetrics(PrintWriter printWriter) {
        printWriter.println("<h2>Metrics</h2>");
        int poolSize = this.workUnitExecutor.getPoolSize();
        printWriter.println(new StringBuilder(43).append("Worker Threads: ").append(poolSize).append("/").append(this.workUnitExecutor.getMaximumPoolSize()).append("<br>").toString());
        printWriter.println(new StringBuilder(31).append("Active Threads: ").append(this.workUnitExecutor.getActiveCount()).append("<br>").toString());
        printWriter.println(new StringBuilder(44).append("Work Queue Size: ").append(this.workUnitExecutor.getQueue().size()).append("/").append(100).append("<br>").toString());
        printWriter.print("Commit Queues: (");
        printWriter.print(this.commitQueue.weight() >> 20);
        printWriter.println("MB)<ul>");
        for (String str : this.commitQueue.keySet()) {
            printWriter.print("<li>");
            printWriter.print(str);
            printWriter.print(": ");
            printWriter.print(this.commitQueue.queueSize(str));
            printWriter.println("</li>");
        }
        printWriter.println("</ul>");
        this.stateCache.printSummaryHtml(printWriter);
        this.metricTrackingWindmillServer.printHtml(printWriter);
        printWriter.println("Active Keys: <ul>");
        for (Map.Entry<String, ActiveWorkForComputation> entry : this.activeWorkMap.entrySet()) {
            printWriter.print("<li>");
            printWriter.print(entry.getKey());
            printWriter.print(":");
            entry.getValue().printActiveWork(printWriter);
            printWriter.println("</li>");
        }
        printWriter.println("</ul>");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void printResources(PrintWriter printWriter) {
        printWriter.append("<h2>Resources</h2>\n");
        String describeMemory = memoryMonitor.describeMemory();
        printWriter.append((CharSequence) new StringBuilder(15 + String.valueOf(describeMemory).length()).append("Memory is ").append(describeMemory).append("<br>\n").toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void printSpecs(PrintWriter printWriter) {
        printWriter.append("<h2>Specs</h2>\n");
        for (Map.Entry<String, MapTask> entry : this.instructionMap.entrySet()) {
            String key = entry.getKey();
            printWriter.println(new StringBuilder(9 + String.valueOf(key).length()).append("<h3>").append(key).append("</h3>").toString());
            printWriter.print("<script>document.write(JSON.stringify(");
            printWriter.print(entry.getValue().toString());
            printWriter.println(", null, \"&nbsp&nbsp\").replace(/\\n/g, \"<br>\"))</script>");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void printLastException(PrintWriter printWriter) {
        Throwable th = this.lastException.get();
        if (th != null) {
            printWriter.println("<h2>Last Exception</h2>");
            StringWriter stringWriter = new StringWriter();
            th.printStackTrace(new PrintWriter(stringWriter));
            printWriter.println(stringWriter.toString().replace("\t", "&nbsp&nbsp").replace("\n", "<br>"));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void printThreads(PrintWriter printWriter) {
        for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
            Thread key = entry.getKey();
            String valueOf = String.valueOf(key);
            String valueOf2 = String.valueOf(key.getState());
            printWriter.println(new StringBuilder(20 + String.valueOf(valueOf).length() + String.valueOf(valueOf2).length()).append("Thread: ").append(valueOf).append(" State: ").append(valueOf2).append("<br>").toString());
            for (StackTraceElement stackTraceElement : entry.getValue()) {
                String valueOf3 = String.valueOf(stackTraceElement);
                printWriter.println(new StringBuilder(14 + String.valueOf(valueOf3).length()).append("&nbsp&nbsp").append(valueOf3).append("<br>").toString());
            }
            printWriter.println("<br>");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dumpHeap(PrintWriter printWriter) {
        String str;
        try {
            String valueOf = String.valueOf(MemoryMonitor.dumpHeap());
            if (valueOf.length() != 0) {
                str = "Dumped heap to ".concat(valueOf);
            } else {
                str = r2;
                String str2 = new String("Dumped heap to ");
            }
            printWriter.println(str);
        } catch (Exception e) {
            printWriter.println("Failed to dump heap: <br>");
            StringWriter stringWriter = new StringWriter();
            e.printStackTrace(new PrintWriter(stringWriter));
            printWriter.println(stringWriter.toString().replace("\t", "&nbsp&nbsp").replace("\n", "<br>"));
        }
    }
}
