/*
 * Decompiled with CFR 0.152.
 */
package io.flinkspector.datastream;

import com.google.common.base.Preconditions;
import com.lmax.disruptor.RingBuffer;
import io.flinkspector.core.input.Input;
import io.flinkspector.core.runtime.OutputEvent;
import io.flinkspector.core.runtime.OutputVerifier;
import io.flinkspector.core.runtime.Runner;
import io.flinkspector.core.trigger.DefaultTestTrigger;
import io.flinkspector.core.trigger.VerifyFinishedTrigger;
import io.flinkspector.datastream.functions.ParallelFromStreamRecordsFunction;
import io.flinkspector.datastream.functions.TestSink;
import io.flinkspector.datastream.input.EventTimeInput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.TestBaseUtils;

public class DataStreamTestEnvironment
extends TestStreamEnvironment {
    private final Runner runner;

    public DataStreamTestEnvironment(LocalFlinkMiniCluster cluster, int parallelism) {
        super(cluster, parallelism);
        this.runner = new Runner(cluster){

            protected void executeEnvironment() throws Throwable {
                DataStreamTestEnvironment.this.execute();
            }
        };
    }

    public static DataStreamTestEnvironment createTestEnvironment(int parallelism) throws Exception {
        int tasksSlots = Runtime.getRuntime().availableProcessors();
        LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster((int)1, (int)tasksSlots, (boolean)false, (boolean)false, (boolean)true);
        return new DataStreamTestEnvironment(cluster, parallelism);
    }

    public void executeTest() throws Throwable {
        this.runner.executeTest();
    }

    public <IN> TestSink<IN> createTestSink(OutputVerifier<IN> verifier) {
        DefaultTestTrigger trigger = new DefaultTestTrigger();
        int instance = this.runner.registerListener(verifier, (VerifyFinishedTrigger)trigger);
        TestSink sink = new TestSink(instance, (RingBuffer<OutputEvent>)this.runner.getRingBuffer());
        return sink;
    }

    public <IN> TestSink<IN> createTestSink(OutputVerifier<IN> verifier, VerifyFinishedTrigger trigger) {
        int instance = this.runner.registerListener(verifier, trigger);
        TestSink sink = new TestSink(instance, (RingBuffer<OutputEvent>)this.runner.getRingBuffer());
        return sink;
    }

    @SafeVarargs
    public final <OUT> DataStreamSource<OUT> fromElementsWithTimeStamp(StreamRecord<OUT> ... data) {
        return this.fromCollectionWithTimestamp(Arrays.asList(data), false);
    }

    public <OUT> DataStreamSource<OUT> fromInput(EventTimeInput<OUT> input) {
        return this.fromCollectionWithTimestamp(input.getInput(), input.getFlushWindowsSetting());
    }

    public <OUT> DataStreamSource<OUT> fromInput(Input<OUT> input) {
        return this.fromCollection(input.getInput());
    }

    public <OUT> DataStreamSource<OUT> fromCollectionWithTimestamp(Collection<StreamRecord<OUT>> data, Boolean flushWindows) {
        TypeInformation typeInfo;
        Preconditions.checkNotNull(data, (Object)"Collection must not be null");
        if (data.isEmpty()) {
            throw new IllegalArgumentException("Collection must not be empty");
        }
        StreamRecord<OUT> first = data.iterator().next();
        if (first == null) {
            throw new IllegalArgumentException("Collection must not contain null elements");
        }
        try {
            typeInfo = TypeExtractor.getForObject((Object)first.getValue());
        }
        catch (Exception e) {
            throw new RuntimeException("Could not startWith TypeInformation for type " + first.getClass() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
        }
        return this.fromCollectionWithTimestamp(data, typeInfo, flushWindows);
    }

    public <OUT> DataStreamSource<OUT> fromCollectionWithTimestamp(Collection<StreamRecord<OUT>> data, TypeInformation<OUT> outType, Boolean flushWindows) {
        ParallelFromStreamRecordsFunction function;
        TypeInformation typeInfo;
        Preconditions.checkNotNull(data, (Object)"Collection must not be null");
        StreamRecord<OUT> first = data.iterator().next();
        try {
            typeInfo = TypeExtractor.getForObject(first);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not startWith TypeInformation for type " + first.getClass() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
        }
        FromElementsFunction.checkCollection(data, (Class)typeInfo.getTypeClass());
        try {
            function = new ParallelFromStreamRecordsFunction(typeInfo.createSerializer(this.getConfig()), data, flushWindows);
        }
        catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        return this.addSource((SourceFunction)function, "Collection Source", (TypeInformation)outType);
    }

    public <OUT> DataStreamSource<OUT> fromInput(Collection<OUT> input) {
        return super.fromCollection(input);
    }

    public Boolean hasBeenStopped() {
        return this.runner.hasBeenStopped();
    }

    public Long getTimeoutInterval() {
        return this.runner.getTimeoutInterval();
    }

    public void setTimeoutInterval(long interval) {
        this.runner.setTimeoutInterval(interval);
    }
}

