package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Optional;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Supplier;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterators;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.Futures;
import com.google.cloud.dataflow.sdk.runners.worker.WindmillStateCache;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
import com.google.cloud.dataflow.sdk.util.Weighted;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.util.state.BagState;
import com.google.cloud.dataflow.sdk.util.state.CombiningValueStateInternal;
import com.google.cloud.dataflow.sdk.util.state.MergingStateInternals;
import com.google.cloud.dataflow.sdk.util.state.State;
import com.google.cloud.dataflow.sdk.util.state.StateContents;
import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
import com.google.cloud.dataflow.sdk.util.state.StateTable;
import com.google.cloud.dataflow.sdk.util.state.StateTag;
import com.google.cloud.dataflow.sdk.util.state.StateTags;
import com.google.cloud.dataflow.sdk.util.state.ValueState;
import com.google.cloud.dataflow.sdk.util.state.WatermarkStateInternal;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.concurrent.NotThreadSafe;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateInternals.class */
public class WindmillStateInternals extends MergingStateInternals {
    private WindmillStateCache.ForKey cache;
    Supplier<StateSampler.ScopedState> scopedReadStateSupplier;
    private StateTable workItemState;

    @VisibleForTesting
    static final ThreadLocal<Supplier<Boolean>> COMPACT_NOW = new ThreadLocal<Supplier<Boolean>>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public Supplier<Boolean> initialValue() {
            return new Supplier<Boolean>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.1.1
                static final double RATE = 0.002d;
                Random random = new Random();
                long counter = nextSample();

                private long nextSample() {
                    return (long) Math.floor(Math.log(this.random.nextDouble()) / Math.log(0.998d));
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Supplier
                public Boolean get() {
                    this.counter--;
                    if (this.counter >= 0) {
                        return false;
                    }
                    this.counter = nextSample();
                    return true;
                }
            };
        }
    };

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateInternals$CachingStateTable.class */
    private static class CachingStateTable extends StateTable {
        private final String stateFamily;
        private final WindmillStateReader reader;
        private final WindmillStateCache.ForKey cache;
        private final Supplier<StateSampler.ScopedState> scopedReadStateSupplier;

        public CachingStateTable(String str, WindmillStateReader windmillStateReader, WindmillStateCache.ForKey forKey, Supplier<StateSampler.ScopedState> supplier) {
            this.stateFamily = str;
            this.reader = windmillStateReader;
            this.cache = forKey;
            this.scopedReadStateSupplier = supplier;
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.StateTable
        protected StateTag.StateBinder binderForNamespace(final StateNamespace stateNamespace) {
            return new StateTag.StateBinder() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.CachingStateTable.1
                @Override // com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder
                public <T> BagState<T> bindBag(StateTag<BagState<T>> stateTag, Coder<T> coder) {
                    WindmillBag windmillBag = (WindmillBag) CachingStateTable.this.cache.get(stateNamespace, stateTag);
                    if (windmillBag == null) {
                        windmillBag = new WindmillBag(stateNamespace, stateTag, CachingStateTable.this.stateFamily, coder);
                    }
                    windmillBag.initializeForWorkItem(CachingStateTable.this.reader, CachingStateTable.this.scopedReadStateSupplier);
                    return windmillBag;
                }

                @Override // com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder
                public <W extends BoundedWindow> WatermarkStateInternal bindWatermark(StateTag<WatermarkStateInternal> stateTag, OutputTimeFn<? super W> outputTimeFn) {
                    WindmillWatermarkState windmillWatermarkState = (WindmillWatermarkState) CachingStateTable.this.cache.get(stateNamespace, stateTag);
                    if (windmillWatermarkState == null) {
                        windmillWatermarkState = new WindmillWatermarkState(stateNamespace, stateTag, CachingStateTable.this.stateFamily, outputTimeFn);
                    }
                    windmillWatermarkState.initializeForWorkItem(CachingStateTable.this.reader, CachingStateTable.this.scopedReadStateSupplier);
                    return windmillWatermarkState;
                }

                @Override // com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder
                public <InputT, AccumT, OutputT> CombiningValueStateInternal<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningValueStateInternal<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                    WindmillCombiningValue windmillCombiningValue = new WindmillCombiningValue(stateNamespace, stateTag, CachingStateTable.this.stateFamily, coder, combineFn, CachingStateTable.this.cache);
                    windmillCombiningValue.initializeForWorkItem(CachingStateTable.this.reader, CachingStateTable.this.scopedReadStateSupplier);
                    return windmillCombiningValue;
                }

                @Override // com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder
                public <T> ValueState<T> bindValue(StateTag<ValueState<T>> stateTag, Coder<T> coder) {
                    WindmillValue windmillValue = (WindmillValue) CachingStateTable.this.cache.get(stateNamespace, stateTag);
                    if (windmillValue == null) {
                        windmillValue = new WindmillValue(stateNamespace, stateTag, CachingStateTable.this.stateFamily, coder);
                    }
                    windmillValue.initializeForWorkItem(CachingStateTable.this.reader, CachingStateTable.this.scopedReadStateSupplier);
                    return windmillValue;
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateInternals$ConcatIterables.class */
    public static class ConcatIterables<T> implements Iterable<T> {
        List<Iterable<T>> iterables = new ArrayList();

        public void extendWith(Iterable<T> iterable) {
            this.iterables.add(iterable);
        }

        @Override // java.lang.Iterable
        public Iterator<T> iterator() {
            return Iterators.concat(Iterables.transform(this.iterables, new Function<Iterable<T>, Iterator<T>>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.ConcatIterables.1
                @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function
                public Iterator<T> apply(Iterable<T> iterable) {
                    return iterable.iterator();
                }
            }).iterator());
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateInternals$SimpleWindmillState.class */
    private static abstract class SimpleWindmillState extends WindmillState {
        private SimpleWindmillState() {
            super();
        }

        @Override // com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillState
        public final Future<Windmill.WorkItemCommitRequest> persist(WindmillStateCache.ForKey forKey) throws IOException {
            return Futures.immediateFuture(persistDirectly(forKey));
        }

        protected abstract Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKey forKey) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateInternals$WindmillBag.class */
    public static class WindmillBag<T> extends SimpleWindmillState implements BagState<T> {
        private final StateNamespace namespace;
        private final StateTag<BagState<T>> address;
        private final ByteString stateKey;
        private final String stateFamily;
        private final Coder<T> elemCoder;
        private boolean cleared;
        private ConcatIterables<T> cachedValues;
        private List<T> localAdditions;
        private long encodedSize;

        private WindmillBag(StateNamespace stateNamespace, StateTag<BagState<T>> stateTag, String str, Coder<T> coder) {
            super();
            this.cachedValues = null;
            this.localAdditions = new ArrayList();
            this.encodedSize = 0L;
            this.namespace = stateNamespace;
            this.address = stateTag;
            this.stateKey = WindmillStateInternals.encodeKey(stateNamespace, stateTag);
            this.stateFamily = str;
            this.elemCoder = coder;
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.State
        public void clear() {
            this.cleared = true;
            this.cachedValues = new ConcatIterables<>();
            this.localAdditions.clear();
            this.encodedSize = 0L;
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Failed to calculate best type for var: r7v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r8v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException
         */
        /* JADX WARN: Not initialized variable reg: 7, insn: 0x00c0: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r7 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:52:0x00c0 */
        /* JADX WARN: Not initialized variable reg: 8, insn: 0x00c4: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r8 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:54:0x00c4 */
        /* JADX WARN: Type inference failed for: r7v0, types: [com.google.cloud.dataflow.sdk.util.common.worker.StateSampler$ScopedState] */
        /* JADX WARN: Type inference failed for: r8v0, types: [java.lang.Throwable] */
        public Iterable<T> fetchData(Future<Iterable<T>> future) {
            ?? r7;
            ?? r8;
            try {
                try {
                    StateSampler.ScopedState scopedReadState = scopedReadState();
                    Throwable th = null;
                    if (this.cachedValues != null) {
                        ConcatIterables<T> concatIterables = this.cachedValues;
                        if (scopedReadState != null) {
                            if (0 != 0) {
                                try {
                                    scopedReadState.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                scopedReadState.close();
                            }
                        }
                        return concatIterables;
                    }
                    Iterable<T> iterable = future.get();
                    if (!(iterable instanceof Weighted)) {
                        if (scopedReadState != null) {
                            if (0 != 0) {
                                try {
                                    scopedReadState.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                scopedReadState.close();
                            }
                        }
                        return iterable;
                    }
                    this.cachedValues = new ConcatIterables<>();
                    this.cachedValues.extendWith(iterable);
                    this.encodedSize = ((Weighted) iterable).getWeight();
                    ConcatIterables<T> concatIterables2 = this.cachedValues;
                    if (scopedReadState != null) {
                        if (0 != 0) {
                            try {
                                scopedReadState.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            scopedReadState.close();
                        }
                    }
                    return concatIterables2;
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException("Unable to read state", e);
                }
            } catch (Throwable th5) {
                if (r7 != 0) {
                    if (r8 != 0) {
                        try {
                            r7.close();
                        } catch (Throwable th6) {
                            r8.addSuppressed(th6);
                        }
                    } else {
                        r7.close();
                    }
                }
                throw th5;
            }
        }

        public boolean valuesAreCached() {
            return this.cachedValues != null;
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.MergeableState
        public StateContents<Iterable<T>> get() {
            final Future<Iterable<T>> listFuture = this.cachedValues != null ? null : this.reader.listFuture(this.stateKey, this.stateFamily, this.elemCoder);
            return new StateContents<Iterable<T>>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillBag.1
                @Override // com.google.cloud.dataflow.sdk.util.state.StateContents
                public Iterable<T> read() {
                    return Iterables.concat(WindmillBag.this.fetchData(listFuture), WindmillBag.this.localAdditions);
                }
            };
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.MergeableState
        public StateContents<Boolean> isEmpty() {
            final Future<Iterable<T>> listFuture = this.cachedValues != null ? null : this.reader.listFuture(this.stateKey, this.stateFamily, this.elemCoder);
            return new StateContents<Boolean>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillBag.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.google.cloud.dataflow.sdk.util.state.StateContents
                public Boolean read() {
                    return Boolean.valueOf(Iterables.isEmpty(WindmillBag.this.fetchData(listFuture)) && WindmillBag.this.localAdditions.isEmpty());
                }
            };
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.MergeableState
        public void add(T t) {
            this.localAdditions.add(t);
        }

        @Override // com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.SimpleWindmillState
        public Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKey forKey) throws IOException {
            Windmill.WorkItemCommitRequest.Builder newBuilder = Windmill.WorkItemCommitRequest.newBuilder();
            if (this.cleared) {
                newBuilder.addListUpdatesBuilder().setTag(this.stateKey).setStateFamily(this.stateFamily).setEndTimestamp(Long.MAX_VALUE);
            }
            if (!this.localAdditions.isEmpty()) {
                byte[] bArr = {0};
                Windmill.TagList.Builder stateFamily = newBuilder.addListUpdatesBuilder().setTag(this.stateKey).setStateFamily(this.stateFamily);
                for (T t : this.localAdditions) {
                    ByteString.Output newOutput = ByteString.newOutput();
                    newOutput.write(bArr);
                    this.elemCoder.encode(t, newOutput, Coder.Context.OUTER);
                    ByteString byteString = newOutput.toByteString();
                    if (this.cachedValues != null) {
                        this.encodedSize += byteString.size() - 1;
                    }
                    stateFamily.addValuesBuilder().setData(byteString).setTimestamp(Long.MAX_VALUE);
                }
            }
            if (this.cachedValues != null) {
                this.cachedValues.extendWith(this.localAdditions);
                this.localAdditions = new ArrayList();
                forKey.put(this.namespace, this.address, this, this.encodedSize);
            } else {
                this.localAdditions.clear();
            }
            this.cleared = false;
            return newBuilder.buildPartial();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateInternals$WindmillCombiningValue.class */
    public static class WindmillCombiningValue<InputT, AccumT, OutputT> extends WindmillState implements CombiningValueStateInternal<InputT, AccumT, OutputT> {
        private final WindmillBag<AccumT> bag;
        private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
        private AccumT localAdditionsAccum;
        private boolean hasLocalAdditions;

        private WindmillCombiningValue(StateNamespace stateNamespace, StateTag<CombiningValueStateInternal<InputT, AccumT, OutputT>> stateTag, String str, Coder<AccumT> coder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, WindmillStateCache.ForKey forKey) {
            super();
            this.hasLocalAdditions = false;
            StateTag convertToBagTagInternal = StateTags.convertToBagTagInternal(stateTag);
            WindmillBag<AccumT> windmillBag = (WindmillBag) forKey.get(stateNamespace, convertToBagTagInternal);
            this.bag = windmillBag != null ? windmillBag : new WindmillBag<>(stateNamespace, convertToBagTagInternal, str, coder);
            this.combineFn = combineFn;
            this.localAdditionsAccum = combineFn.createAccumulator();
        }

        @Override // com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillState
        void initializeForWorkItem(WindmillStateReader windmillStateReader, Supplier<StateSampler.ScopedState> supplier) {
            super.initializeForWorkItem(windmillStateReader, supplier);
            this.bag.initializeForWorkItem(windmillStateReader, supplier);
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.MergeableState
        public StateContents<OutputT> get() {
            final StateContents<AccumT> accum = getAccum();
            return new StateContents<OutputT>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillCombiningValue.1
                /* JADX WARN: Multi-variable type inference failed */
                @Override // com.google.cloud.dataflow.sdk.util.state.StateContents
                public OutputT read() {
                    return (OutputT) WindmillCombiningValue.this.combineFn.extractOutput(accum.read());
                }
            };
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.MergeableState
        public void add(InputT inputt) {
            this.hasLocalAdditions = true;
            this.localAdditionsAccum = this.combineFn.addInput(this.localAdditionsAccum, inputt);
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.State
        public void clear() {
            this.bag.clear();
            this.localAdditionsAccum = this.combineFn.createAccumulator();
            this.hasLocalAdditions = false;
        }

        @Override // com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillState
        public Future<Windmill.WorkItemCommitRequest> persist(WindmillStateCache.ForKey forKey) throws IOException {
            if (this.hasLocalAdditions) {
                if (WindmillStateInternals.COMPACT_NOW.get().get().booleanValue() || this.bag.valuesAreCached()) {
                    this.localAdditionsAccum = getAccum().read();
                }
                this.bag.add(this.combineFn.compact(this.localAdditionsAccum));
                this.localAdditionsAccum = this.combineFn.createAccumulator();
                this.hasLocalAdditions = false;
            }
            return this.bag.persist(forKey);
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.CombiningValueStateInternal
        public StateContents<AccumT> getAccum() {
            final StateContents<Iterable<AccumT>> stateContents = this.bag.get();
            return new StateContents<AccumT>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillCombiningValue.2
                @Override // com.google.cloud.dataflow.sdk.util.state.StateContents
                public AccumT read() {
                    AccumT accumt = (AccumT) WindmillCombiningValue.this.combineFn.mergeAccumulators(Iterables.concat((Iterable) stateContents.read(), Collections.singleton(WindmillCombiningValue.this.localAdditionsAccum)));
                    WindmillCombiningValue.this.bag.clear();
                    WindmillCombiningValue.this.localAdditionsAccum = accumt;
                    WindmillCombiningValue.this.hasLocalAdditions = true;
                    return accumt;
                }
            };
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.MergeableState
        public StateContents<Boolean> isEmpty() {
            final StateContents<Boolean> isEmpty = this.bag.isEmpty();
            return new StateContents<Boolean>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillCombiningValue.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.google.cloud.dataflow.sdk.util.state.StateContents
                public Boolean read() {
                    return Boolean.valueOf(!WindmillCombiningValue.this.hasLocalAdditions && ((Boolean) isEmpty.read()).booleanValue());
                }
            };
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.CombiningValueStateInternal
        public void addAccum(AccumT accumt) {
            this.hasLocalAdditions = true;
            this.localAdditionsAccum = this.combineFn.mergeAccumulators(Arrays.asList(this.localAdditionsAccum, accumt));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateInternals$WindmillState.class */
    public static abstract class WindmillState {
        protected Supplier<StateSampler.ScopedState> scopedReadStateSupplier;
        protected WindmillStateReader reader;

        private WindmillState() {
        }

        abstract Future<Windmill.WorkItemCommitRequest> persist(WindmillStateCache.ForKey forKey) throws IOException;

        void initializeForWorkItem(WindmillStateReader windmillStateReader, Supplier<StateSampler.ScopedState> supplier) {
            this.reader = windmillStateReader;
            this.scopedReadStateSupplier = supplier;
        }

        StateSampler.ScopedState scopedReadState() {
            return this.scopedReadStateSupplier.get();
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateInternals$WindmillValue.class */
    private static class WindmillValue<T> extends SimpleWindmillState implements ValueState<T> {
        private final StateNamespace namespace;
        private final StateTag<ValueState<T>> address;
        private final ByteString stateKey;
        private final String stateFamily;
        private final Coder<T> coder;
        private boolean modified;
        private boolean valueIsKnown;
        private T value;

        private WindmillValue(StateNamespace stateNamespace, StateTag<ValueState<T>> stateTag, String str, Coder<T> coder) {
            super();
            this.modified = false;
            this.valueIsKnown = false;
            this.namespace = stateNamespace;
            this.address = stateTag;
            this.stateKey = WindmillStateInternals.encodeKey(stateNamespace, stateTag);
            this.stateFamily = str;
            this.coder = coder;
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.State
        public void clear() {
            this.modified = true;
            this.valueIsKnown = true;
            this.value = null;
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.ValueState
        public StateContents<T> get() {
            final Future<T> immediateFuture = this.valueIsKnown ? Futures.immediateFuture(this.value) : this.reader.valueFuture(this.stateKey, this.stateFamily, this.coder);
            return new StateContents<T>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillValue.1
                @Override // com.google.cloud.dataflow.sdk.util.state.StateContents
                public T read() {
                    try {
                        StateSampler.ScopedState scopedReadState = WindmillValue.this.scopedReadState();
                        Throwable th = null;
                        try {
                            WindmillValue.this.valueIsKnown = true;
                            T t = (T) immediateFuture.get();
                            if (scopedReadState != null) {
                                if (0 != 0) {
                                    try {
                                        scopedReadState.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    scopedReadState.close();
                                }
                            }
                            return t;
                        } catch (Throwable th3) {
                            if (scopedReadState != null) {
                                if (0 != 0) {
                                    try {
                                        scopedReadState.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    scopedReadState.close();
                                }
                            }
                            throw th3;
                        }
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException("Unable to read value from state", e);
                    }
                }
            };
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.ValueState
        public void set(T t) {
            this.modified = true;
            this.valueIsKnown = true;
            this.value = t;
        }

        @Override // com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.SimpleWindmillState
        protected Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKey forKey) throws IOException {
            if (!this.modified) {
                return Windmill.WorkItemCommitRequest.newBuilder().buildPartial();
            }
            ByteString.Output newOutput = ByteString.newOutput();
            if (this.value != null) {
                this.coder.encode(this.value, newOutput, Coder.Context.OUTER);
            }
            ByteString byteString = newOutput.toByteString();
            Windmill.WorkItemCommitRequest.Builder newBuilder = Windmill.WorkItemCommitRequest.newBuilder();
            forKey.put(this.namespace, this.address, this, byteString.size());
            this.modified = false;
            newBuilder.addValueUpdatesBuilder().setTag(this.stateKey).setStateFamily(this.stateFamily).getValueBuilder().setData(byteString).setTimestamp(Long.MAX_VALUE);
            return newBuilder.buildPartial();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateInternals$WindmillWatermarkState.class */
    public static class WindmillWatermarkState extends WindmillState implements WatermarkStateInternal {
        private static final int ENCODED_SIZE = 8;
        private final OutputTimeFn<?> outputTimeFn;
        private final StateNamespace namespace;
        private final StateTag<WatermarkStateInternal> address;
        private final ByteString stateKey;
        private final String stateFamily;
        private boolean cleared;
        private Optional<Instant> cachedValue;
        private Instant localAdditions;

        private WindmillWatermarkState(StateNamespace stateNamespace, StateTag<WatermarkStateInternal> stateTag, String str, OutputTimeFn<?> outputTimeFn) {
            super();
            this.cleared = false;
            this.cachedValue = null;
            this.localAdditions = null;
            this.namespace = stateNamespace;
            this.address = stateTag;
            this.stateKey = WindmillStateInternals.encodeKey(stateNamespace, stateTag);
            this.stateFamily = str;
            this.outputTimeFn = outputTimeFn;
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.State
        public void clear() {
            this.cleared = true;
            this.cachedValue = Optional.absent();
            this.localAdditions = null;
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.WatermarkStateInternal
        public void releaseExtraneousHolds() {
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.MergeableState
        public StateContents<Instant> get() {
            final Future<Instant> immediateFuture = this.cachedValue != null ? Futures.immediateFuture(this.cachedValue.orNull()) : this.reader.watermarkFuture(this.stateKey, this.stateFamily);
            return new StateContents<Instant>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillWatermarkState.1
                /* JADX WARN: Can't rename method to resolve collision */
                /* JADX WARN: Finally extract failed */
                @Override // com.google.cloud.dataflow.sdk.util.state.StateContents
                public Instant read() {
                    try {
                        StateSampler.ScopedState scopedReadState = WindmillWatermarkState.this.scopedReadState();
                        Throwable th = null;
                        try {
                            Instant instant = (Instant) immediateFuture.get();
                            if (instant == null || instant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                                WindmillWatermarkState.this.cachedValue = Optional.absent();
                            } else {
                                WindmillWatermarkState.this.cachedValue = Optional.of(instant);
                            }
                            if (scopedReadState != null) {
                                if (0 != 0) {
                                    try {
                                        scopedReadState.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    scopedReadState.close();
                                }
                            }
                            return WindmillWatermarkState.this.localAdditions == null ? (Instant) WindmillWatermarkState.this.cachedValue.orNull() : !WindmillWatermarkState.this.cachedValue.isPresent() ? WindmillWatermarkState.this.localAdditions : WindmillWatermarkState.this.outputTimeFn.combine(WindmillWatermarkState.this.localAdditions, (Instant) WindmillWatermarkState.this.cachedValue.get());
                        } catch (Throwable th3) {
                            if (scopedReadState != null) {
                                if (0 != 0) {
                                    try {
                                        scopedReadState.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    scopedReadState.close();
                                }
                            }
                            throw th3;
                        }
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException("Unable to read state", e);
                    }
                }
            };
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.MergeableState
        public StateContents<Boolean> isEmpty() {
            throw new UnsupportedOperationException();
        }

        @Override // com.google.cloud.dataflow.sdk.util.state.MergeableState
        public void add(Instant instant) {
            this.localAdditions = this.localAdditions == null ? instant : this.outputTimeFn.combine(instant, this.localAdditions);
        }

        @Override // com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillState
        public Future<Windmill.WorkItemCommitRequest> persist(final WindmillStateCache.ForKey forKey) {
            Future<Windmill.WorkItemCommitRequest> combineWithPersisted;
            if (!this.cleared && this.localAdditions == null) {
                return Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());
            }
            if (this.cleared && this.localAdditions == null) {
                Windmill.WorkItemCommitRequest.Builder newBuilder = Windmill.WorkItemCommitRequest.newBuilder();
                newBuilder.addWatermarkHoldsBuilder().setTag(this.stateKey).setStateFamily(this.stateFamily).setReset(true);
                combineWithPersisted = Futures.immediateFuture(newBuilder.buildPartial());
            } else if (this.cleared && this.localAdditions != null) {
                Windmill.WorkItemCommitRequest.Builder newBuilder2 = Windmill.WorkItemCommitRequest.newBuilder();
                newBuilder2.addWatermarkHoldsBuilder().setTag(this.stateKey).setStateFamily(this.stateFamily).setReset(true).addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(this.localAdditions));
                this.cachedValue = Optional.of(this.localAdditions);
                combineWithPersisted = Futures.immediateFuture(newBuilder2.buildPartial());
            } else {
                if (this.cleared || this.localAdditions == null) {
                    throw new IllegalStateException("Unreachable condition");
                }
                combineWithPersisted = combineWithPersisted();
            }
            return Futures.lazyTransform(combineWithPersisted, new Function<Windmill.WorkItemCommitRequest, Windmill.WorkItemCommitRequest>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillWatermarkState.2
                @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function
                public Windmill.WorkItemCommitRequest apply(Windmill.WorkItemCommitRequest workItemCommitRequest) {
                    WindmillWatermarkState.this.cleared = false;
                    WindmillWatermarkState.this.localAdditions = null;
                    if (WindmillWatermarkState.this.cachedValue != null) {
                        forKey.put(WindmillWatermarkState.this.namespace, WindmillWatermarkState.this.address, WindmillWatermarkState.this, 8L);
                    }
                    return workItemCommitRequest;
                }
            });
        }

        private Future<Windmill.WorkItemCommitRequest> combineWithPersisted() {
            if (!(false | this.outputTimeFn.dependsOnlyOnWindow()) && !this.outputTimeFn.dependsOnlyOnEarliestInputTimestamp()) {
                return Futures.lazyTransform(this.cachedValue != null ? Futures.immediateFuture(this.cachedValue.orNull()) : this.reader.watermarkFuture(this.stateKey, this.stateFamily), new Function<Instant, Windmill.WorkItemCommitRequest>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals.WindmillWatermarkState.3
                    @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function
                    public Windmill.WorkItemCommitRequest apply(Instant instant) {
                        WindmillWatermarkState.this.cachedValue = Optional.of(instant != null ? WindmillWatermarkState.this.outputTimeFn.combine(instant, WindmillWatermarkState.this.localAdditions) : WindmillWatermarkState.this.localAdditions);
                        Windmill.WorkItemCommitRequest.Builder newBuilder = Windmill.WorkItemCommitRequest.newBuilder();
                        newBuilder.addWatermarkHoldsBuilder().setTag(WindmillWatermarkState.this.stateKey).setStateFamily(WindmillWatermarkState.this.stateFamily).setReset(true).addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp((Instant) WindmillWatermarkState.this.cachedValue.get()));
                        return newBuilder.buildPartial();
                    }
                });
            }
            Windmill.WorkItemCommitRequest.Builder newBuilder = Windmill.WorkItemCommitRequest.newBuilder();
            newBuilder.addWatermarkHoldsBuilder().setTag(this.stateKey).setStateFamily(this.stateFamily).addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(this.localAdditions));
            if (this.cachedValue != null) {
                this.cachedValue = Optional.of(this.cachedValue.isPresent() ? this.outputTimeFn.combine(this.cachedValue.get(), this.localAdditions) : this.localAdditions);
            }
            return Futures.immediateFuture(newBuilder.buildPartial());
        }
    }

    public WindmillStateInternals(String str, WindmillStateReader windmillStateReader, WindmillStateCache.ForKey forKey, Supplier<StateSampler.ScopedState> supplier) {
        this.cache = forKey;
        this.scopedReadStateSupplier = supplier;
        this.workItemState = new CachingStateTable(str, windmillStateReader, forKey, supplier);
    }

    public void persist(Windmill.WorkItemCommitRequest.Builder builder) {
        ArrayList arrayList = new ArrayList();
        for (Object obj : this.workItemState.values()) {
            if (!(obj instanceof WindmillState)) {
                throw new IllegalStateException(String.format("%s wasn't created by %s -- unable to persist it", obj.getClass().getSimpleName(), getClass().getSimpleName()));
            }
            try {
                arrayList.add(((WindmillState) obj).persist(this.cache));
            } catch (IOException e) {
                throw new RuntimeException("Unable to persist state", e);
            }
        }
        this.workItemState.clear();
        try {
            StateSampler.ScopedState scopedState = this.scopedReadStateSupplier.get();
            Throwable th = null;
            try {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    builder.mergeFrom((Windmill.WorkItemCommitRequest) ((Future) it.next()).get());
                }
                if (scopedState != null) {
                    if (0 != 0) {
                        try {
                            scopedState.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        scopedState.close();
                    }
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e2) {
            throw new RuntimeException("Failed to retrieve Windmill state during persist()", e2);
        }
    }

    @VisibleForTesting
    static ByteString encodeKey(StateNamespace stateNamespace, StateTag<?> stateTag) {
        try {
            ByteString.Output newOutput = ByteString.newOutput();
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(newOutput, StandardCharsets.UTF_8);
            stateNamespace.appendTo(outputStreamWriter);
            outputStreamWriter.write(43);
            stateTag.appendTo(outputStreamWriter);
            outputStreamWriter.flush();
            return newOutput.toByteString();
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // com.google.cloud.dataflow.sdk.util.state.StateInternals
    public <T extends State> T state(StateNamespace stateNamespace, StateTag<T> stateTag) {
        return (T) this.workItemState.get(stateNamespace, stateTag);
    }
}
