package org.apache.apex.malhar.stream.FunctionOperator;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.apex.malhar.stream.api.Option;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.class */
public class FunctionOperatorTest {
    private static final int NumTuples = 10;
    private static final int NumFlatMapTuples = 100;
    private static final int divider = 2;
    private static final int listSize = 10;
    private static int TupleCount;
    private static int sum;

    /* loaded from: input_file:org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest$FmFunction.class */
    public static class FmFunction implements Function.FlatMapFunction<List<Integer>, Integer> {
        public Iterable<Integer> f(List<Integer> list) {
            ArrayList arrayList = new ArrayList();
            Iterator<Integer> it = list.iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                if (intValue % 13 == 0 || intValue % 17 == 0) {
                    arrayList.add(Integer.valueOf(intValue * intValue));
                }
            }
            return arrayList;
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest$NumberGenerator.class */
    public static class NumberGenerator extends BaseOperator implements InputOperator {
        private int num;
        public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();

        public void setup(Context.OperatorContext operatorContext) {
            this.num = 0;
        }

        public void emitTuples() {
            if (this.num < 10) {
                this.output.emit(Integer.valueOf(this.num));
                this.num++;
            }
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest$NumberListGenerator.class */
    public static class NumberListGenerator extends BaseOperator implements InputOperator {
        private int numMem;
        private List<Integer> nums;
        public final transient DefaultOutputPort<List<Integer>> output = new DefaultOutputPort<>();

        public void setup(Context.OperatorContext operatorContext) {
            this.numMem = 0;
            this.nums = new ArrayList();
        }

        public void emitTuples() {
            this.nums.add(Integer.valueOf(this.numMem));
            this.numMem++;
            if (this.numMem >= FunctionOperatorTest.NumFlatMapTuples || this.nums.size() >= 10) {
                return;
            }
            this.output.emit(this.nums);
            this.nums.clear();
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest$ResultCollector.class */
    public static class ResultCollector extends BaseOperator {
        public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() { // from class: org.apache.apex.malhar.stream.FunctionOperator.FunctionOperatorTest.ResultCollector.1
            public void process(Integer num) {
                FunctionOperatorTest.access$008();
                FunctionOperatorTest.access$112(num.intValue());
            }
        };

        public void setup(Context.OperatorContext operatorContext) {
            int unused = FunctionOperatorTest.TupleCount = 0;
            int unused2 = FunctionOperatorTest.sum = 0;
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest$Square.class */
    public static class Square implements Function.MapFunction<Integer, Integer> {
        public Integer f(Integer num) {
            return Integer.valueOf(num.intValue() * num.intValue());
        }
    }

    @Test
    public void testMapOperator() throws Exception {
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        NumberGenerator addOperator = dag.addOperator("numGen", new NumberGenerator());
        FunctionOperator.MapFunctionOperator addOperator2 = dag.addOperator("mapper", new FunctionOperator.MapFunctionOperator(new Square()));
        ResultCollector addOperator3 = dag.addOperator("collector", new ResultCollector());
        dag.addStream("raw numbers", addOperator.output, addOperator2.input);
        dag.addStream("mapped results", addOperator2.output, addOperator3.input);
        StramLocalCluster controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        controller.setExitCondition(new Callable<Boolean>() { // from class: org.apache.apex.malhar.stream.FunctionOperator.FunctionOperatorTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(FunctionOperatorTest.TupleCount == 10);
            }
        });
        controller.run(5000L);
        Assert.assertEquals(sum, 285L);
    }

    @Test
    public void testMapOperatorStream() throws Exception {
        NumberGenerator numberGenerator = new NumberGenerator();
        ResultCollector resultCollector = new ResultCollector();
        StreamFactory.fromInput(numberGenerator, numberGenerator.output, new Option[0]).map(new Square(), new Option[0]).addOperator(resultCollector, resultCollector.input, (Operator.OutputPort) null, new Option[0]).runEmbedded(false, 10000L, new Callable<Boolean>() { // from class: org.apache.apex.malhar.stream.FunctionOperator.FunctionOperatorTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(FunctionOperatorTest.TupleCount == 10);
            }
        });
        Assert.assertEquals(sum, 285L);
    }

    @Test
    public void testFlatMapOperator() throws Exception {
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        NumberListGenerator addOperator = dag.addOperator("numGen", new NumberListGenerator());
        FunctionOperator.FlatMapFunctionOperator addOperator2 = dag.addOperator("flatmap", new FunctionOperator.FlatMapFunctionOperator(new FmFunction()));
        ResultCollector addOperator3 = dag.addOperator("collector", new ResultCollector());
        dag.addStream("raw numbers", addOperator.output, addOperator2.input);
        dag.addStream("flatmap results", addOperator2.output, addOperator3.input);
        StramLocalCluster controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        controller.setExitCondition(new Callable<Boolean>() { // from class: org.apache.apex.malhar.stream.FunctionOperator.FunctionOperatorTest.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(FunctionOperatorTest.TupleCount == 13);
            }
        });
        controller.run(5000L);
        Assert.assertEquals(sum, 39555L);
    }

    @Test
    public void testFlatMapOperatorStream() throws Exception {
        NumberListGenerator numberListGenerator = new NumberListGenerator();
        ResultCollector resultCollector = new ResultCollector();
        StreamFactory.fromInput(numberListGenerator, numberListGenerator.output, new Option[0]).flatMap(new FmFunction(), new Option[0]).addOperator(resultCollector, resultCollector.input, (Operator.OutputPort) null, new Option[0]).runEmbedded(false, 10000L, new Callable<Boolean>() { // from class: org.apache.apex.malhar.stream.FunctionOperator.FunctionOperatorTest.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(FunctionOperatorTest.TupleCount == 13);
            }
        });
        Assert.assertEquals(sum, 39555L);
    }

    @Test
    public void testFilterOperator() throws Exception {
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        FunctionOperator.FilterFunctionOperator filterFunctionOperator = new FunctionOperator.FilterFunctionOperator(new Function.FilterFunction<Integer>() { // from class: org.apache.apex.malhar.stream.FunctionOperator.FunctionOperatorTest.5
            public boolean f(Integer num) {
                return num.intValue() % FunctionOperatorTest.divider == 0;
            }
        });
        NumberGenerator addOperator = dag.addOperator("numGen", new NumberGenerator());
        FunctionOperator.FilterFunctionOperator addOperator2 = dag.addOperator("filter", filterFunctionOperator);
        ResultCollector addOperator3 = dag.addOperator("collector", new ResultCollector());
        dag.addStream("raw numbers", addOperator.output, addOperator2.input);
        dag.addStream("filtered results", addOperator2.output, addOperator3.input);
        StramLocalCluster controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        controller.setExitCondition(new Callable<Boolean>() { // from class: org.apache.apex.malhar.stream.FunctionOperator.FunctionOperatorTest.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(FunctionOperatorTest.TupleCount == 5);
            }
        });
        controller.run(5000L);
        Assert.assertEquals(sum, 20L);
    }

    @Test
    public void testFilterOperatorStream() throws Exception {
        NumberGenerator numberGenerator = new NumberGenerator();
        ResultCollector resultCollector = new ResultCollector();
        StreamFactory.fromInput(numberGenerator, numberGenerator.output, new Option[0]).filter(new Function.FilterFunction<Integer>() { // from class: org.apache.apex.malhar.stream.FunctionOperator.FunctionOperatorTest.7
            public boolean f(Integer num) {
                return num.intValue() % FunctionOperatorTest.divider == 0;
            }
        }, new Option[0]).addOperator(resultCollector, resultCollector.input, (Operator.OutputPort) null, new Option[0]).runEmbedded(false, 10000L, new Callable<Boolean>() { // from class: org.apache.apex.malhar.stream.FunctionOperator.FunctionOperatorTest.8
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(FunctionOperatorTest.TupleCount == 5);
            }
        });
        Assert.assertEquals(sum, 20L);
    }

    static /* synthetic */ int access$008() {
        int i = TupleCount;
        TupleCount = i + 1;
        return i;
    }

    static /* synthetic */ int access$112(int i) {
        int i2 = sum + i;
        sum = i2;
        return i2;
    }
}
