package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
import com.google.cloud.dataflow.sdk.values.KV;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
@SystemDoFnInternal
/* loaded from: input_file:com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.class */
public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow> extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
    private final Aggregator<Long, Long> droppedDueToClosedWindow = createAggregator(ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
    private final Aggregator<Long, Long> droppedDueToLateness = createAggregator(ReduceFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
    private final WindowingStrategy<?, W> strategy;
    private SystemReduceFn.Factory<K, InputT, OutputT, W> reduceFnFactory;

    public GroupAlsoByWindowsViaOutputBufferDoFn(WindowingStrategy<?, W> windowingStrategy, SystemReduceFn.Factory<K, InputT, OutputT, W> factory) {
        this.strategy = windowingStrategy;
        this.reduceFnFactory = factory;
    }

    @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
    public void processElement(DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext processContext) throws Exception {
        K key = processContext.element().getKey();
        BatchTimerInternals batchTimerInternals = new BatchTimerInternals(Instant.now());
        ReduceFnRunner<?, ?, ?, ?> reduceFnRunner = new ReduceFnRunner<>(key, this.strategy, batchTimerInternals, processContext.windowingInternals(), this.droppedDueToClosedWindow, this.droppedDueToLateness, this.reduceFnFactory.create(key));
        for (Iterable<WindowedValue<?>> iterable : Iterables.partition(processContext.element().getValue(), 1000)) {
            reduceFnRunner.processElements(iterable);
            batchTimerInternals.advanceInputWatermark(reduceFnRunner, iterable.iterator().next().getTimestamp());
            batchTimerInternals.advanceProcessingTime(reduceFnRunner, Instant.now());
        }
        batchTimerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
        batchTimerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
        reduceFnRunner.persist();
    }
}
