package org.apache.apex.malhar.stream.api.impl;

import com.datatorrent.api.Operator;
import com.datatorrent.lib.util.KeyValPair;
import java.util.List;
import org.apache.apex.malhar.lib.function.Function;
import org.apache.apex.malhar.lib.window.Accumulation;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.accumulation.FoldFn;
import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
import org.apache.apex.malhar.lib.window.accumulation.SumLong;
import org.apache.apex.malhar.lib.window.accumulation.TopN;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.Option;
import org.apache.apex.malhar.stream.api.WindowedStream;
import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
import org.apache.hadoop.classification.InterfaceStability;
import org.joda.time.Duration;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.class */
public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements WindowedStream<T> {
    protected WindowOption windowOption;
    protected TriggerOption triggerOption;
    protected Duration allowedLateness;

    /* loaded from: input_file:org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl$ConvertFn.class */
    private static class ConvertFn<T> implements Function.MapFunction<T, Tuple<T>> {
        private ConvertFn() {
        }

        public Tuple<T> f(T t) {
            return t instanceof Tuple.TimestampedTuple ? (Tuple.TimestampedTuple) t : new Tuple.TimestampedTuple(System.currentTimeMillis(), t);
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: f, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m2f(Object obj) {
            return f((ConvertFn<T>) obj);
        }
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <STREAM extends WindowedStream<Tuple.WindowedTuple<Long>>> STREAM count(Option... optionArr) {
        WindowedStream windowedStream = (WindowedStream) map(new Function.MapFunction<T, Tuple<Long>>() { // from class: org.apache.apex.malhar.stream.api.impl.ApexWindowedStreamImpl.1
            public Tuple<Long> f(T t) {
                return t instanceof Tuple.TimestampedTuple ? new Tuple.TimestampedTuple(((Tuple.TimestampedTuple) t).getTimestamp(), 1L) : new Tuple.TimestampedTuple(System.currentTimeMillis(), 1L);
            }

            /* renamed from: f, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m1f(Object obj) {
                return f((AnonymousClass1) obj);
            }
        }, new Option[0]);
        Operator createWindowedOperator = createWindowedOperator(new SumLong());
        return (STREAM) windowedStream.addOperator(createWindowedOperator, ((WindowedOperatorImpl) createWindowedOperator).input, ((WindowedOperatorImpl) createWindowedOperator).output, optionArr);
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> toKeyValue, Option... optionArr) {
        WindowedStream windowedStream = (WindowedStream) map(toKeyValue, new Option[0]);
        Operator createKeyedWindowedOperator = createKeyedWindowedOperator(new SumLong());
        return (STREAM) windowedStream.addOperator(createKeyedWindowedOperator, ((KeyedWindowedOperatorImpl) createKeyedWindowedOperator).input, ((KeyedWindowedOperatorImpl) createKeyedWindowedOperator).output, optionArr);
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, List<V>>>>> STREAM topByKey(int i, Function.ToKeyValue<T, K, V> toKeyValue, Option... optionArr) {
        TopN topN = new TopN();
        topN.setN(i);
        WindowedStream windowedStream = (WindowedStream) map(toKeyValue, new Option[0]);
        Operator createKeyedWindowedOperator = createKeyedWindowedOperator(topN);
        return (STREAM) windowedStream.addOperator(createKeyedWindowedOperator, ((KeyedWindowedOperatorImpl) createKeyedWindowedOperator).input, ((KeyedWindowedOperatorImpl) createKeyedWindowedOperator).output, optionArr);
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <STREAM extends WindowedStream<Tuple.WindowedTuple<List<T>>>> STREAM top(int i, Option... optionArr) {
        TopN topN = new TopN();
        topN.setN(i);
        WindowedStream windowedStream = (WindowedStream) map(new ConvertFn(), new Option[0]);
        Operator createWindowedOperator = createWindowedOperator(topN);
        return (STREAM) windowedStream.addOperator(createWindowedOperator, ((WindowedOperatorImpl) createWindowedOperator).input, ((WindowedOperatorImpl) createWindowedOperator).output, optionArr);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <K, V, O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM accumulateByKey(Accumulation<V, ACCU, O> accumulation, Function.ToKeyValue<T, K, V> toKeyValue, Option... optionArr) {
        WindowedStream windowedStream = (WindowedStream) map(toKeyValue, new Option[0]);
        Operator createKeyedWindowedOperator = createKeyedWindowedOperator(accumulation);
        return (STREAM) windowedStream.addOperator(createKeyedWindowedOperator, ((KeyedWindowedOperatorImpl) createKeyedWindowedOperator).input, ((KeyedWindowedOperatorImpl) createKeyedWindowedOperator).output, optionArr);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM accumulate(Accumulation<T, ACCU, O> accumulation, Option... optionArr) {
        WindowedStream windowedStream = (WindowedStream) map(new ConvertFn(), new Option[0]);
        Operator createWindowedOperator = createWindowedOperator(accumulation);
        return (STREAM) windowedStream.addOperator(createWindowedOperator, ((WindowedOperatorImpl) createWindowedOperator).input, ((WindowedOperatorImpl) createWindowedOperator).output, optionArr);
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <STREAM extends WindowedStream<Tuple.WindowedTuple<T>>> STREAM reduce(ReduceFn<T> reduceFn, Option... optionArr) {
        WindowedStream windowedStream = (WindowedStream) map(new ConvertFn(), new Option[0]);
        Operator createWindowedOperator = createWindowedOperator(reduceFn);
        return (STREAM) windowedStream.addOperator(createWindowedOperator, ((WindowedOperatorImpl) createWindowedOperator).input, ((WindowedOperatorImpl) createWindowedOperator).output, optionArr);
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, V>>>> STREAM reduceByKey(ReduceFn<V> reduceFn, Function.ToKeyValue<T, K, V> toKeyValue, Option... optionArr) {
        WindowedStream windowedStream = (WindowedStream) map(toKeyValue, new Option[0]);
        Operator createKeyedWindowedOperator = createKeyedWindowedOperator(reduceFn);
        return (STREAM) windowedStream.addOperator(createKeyedWindowedOperator, ((KeyedWindowedOperatorImpl) createKeyedWindowedOperator).input, ((KeyedWindowedOperatorImpl) createKeyedWindowedOperator).output, optionArr);
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <O, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM fold(FoldFn<T, O> foldFn, Option... optionArr) {
        WindowedStream windowedStream = (WindowedStream) map(new ConvertFn(), new Option[0]);
        Operator createWindowedOperator = createWindowedOperator(foldFn);
        return (STREAM) windowedStream.addOperator(createWindowedOperator, ((WindowedOperatorImpl) createWindowedOperator).input, ((WindowedOperatorImpl) createWindowedOperator).output, optionArr);
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <K, V, O, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM foldByKey(FoldFn<V, O> foldFn, Function.ToKeyValue<T, K, V> toKeyValue, Option... optionArr) {
        WindowedStream windowedStream = (WindowedStream) map(toKeyValue, new Option[0]);
        Operator createKeyedWindowedOperator = createKeyedWindowedOperator(foldFn);
        return (STREAM) windowedStream.addOperator(createKeyedWindowedOperator, ((KeyedWindowedOperatorImpl) createKeyedWindowedOperator).input, ((KeyedWindowedOperatorImpl) createKeyedWindowedOperator).output, optionArr);
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <O, K, STREAM extends WindowedStream<KeyValPair<K, Iterable<O>>>> STREAM groupByKey(Function.ToKeyValue<T, K, O> toKeyValue, Option... optionArr) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <STREAM extends WindowedStream<Iterable<T>>> STREAM group() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <STREAM extends WindowedStream<T>> STREAM resetTrigger(TriggerOption triggerOption) {
        this.triggerOption = triggerOption;
        return this;
    }

    @Override // org.apache.apex.malhar.stream.api.WindowedStream
    public <STREAM extends WindowedStream<T>> STREAM resetAllowedLateness(Duration duration) {
        this.allowedLateness = duration;
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.apex.malhar.stream.api.impl.ApexStreamImpl
    protected <O> ApexStream<O> newStream(DagMeta dagMeta, ApexStreamImpl.Brick<O> brick) {
        ApexWindowedStreamImpl apexWindowedStreamImpl = new ApexWindowedStreamImpl();
        apexWindowedStreamImpl.graph = dagMeta;
        apexWindowedStreamImpl.lastBrick = brick;
        apexWindowedStreamImpl.windowOption = this.windowOption;
        apexWindowedStreamImpl.triggerOption = this.triggerOption;
        apexWindowedStreamImpl.allowedLateness = this.allowedLateness;
        return apexWindowedStreamImpl;
    }

    private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> createWindowedOperator(Accumulation<? super IN, ACCU, OUT> accumulation) {
        WindowedOperatorImpl<IN, ACCU, OUT> windowedOperatorImpl = new WindowedOperatorImpl<>();
        windowedOperatorImpl.setDataStorage(new InMemoryWindowedStorage());
        windowedOperatorImpl.setRetractionStorage(new InMemoryWindowedStorage());
        windowedOperatorImpl.setWindowStateStorage(new InMemoryWindowedStorage());
        if (this.windowOption != null) {
            windowedOperatorImpl.setWindowOption(this.windowOption);
        }
        if (this.triggerOption != null) {
            windowedOperatorImpl.setTriggerOption(this.triggerOption);
        }
        if (this.allowedLateness != null) {
            windowedOperatorImpl.setAllowedLateness(this.allowedLateness);
        }
        windowedOperatorImpl.setAccumulation(accumulation);
        return windowedOperatorImpl;
    }

    private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> createKeyedWindowedOperator(Accumulation<? super V, ACCU, OUT> accumulation) {
        KeyedWindowedOperatorImpl<K, V, ACCU, OUT> keyedWindowedOperatorImpl = new KeyedWindowedOperatorImpl<>();
        keyedWindowedOperatorImpl.setDataStorage(new InMemoryWindowedKeyedStorage());
        keyedWindowedOperatorImpl.setRetractionStorage(new InMemoryWindowedKeyedStorage());
        keyedWindowedOperatorImpl.setWindowStateStorage(new InMemoryWindowedStorage());
        if (this.windowOption != null) {
            keyedWindowedOperatorImpl.setWindowOption(this.windowOption);
        }
        if (this.triggerOption != null) {
            keyedWindowedOperatorImpl.setTriggerOption(this.triggerOption);
        }
        if (this.allowedLateness != null) {
            keyedWindowedOperatorImpl.setAllowedLateness(this.allowedLateness);
        }
        keyedWindowedOperatorImpl.setAccumulation(accumulation);
        return keyedWindowedOperatorImpl;
    }
}
