/*
 * Decompiled with CFR 0.152.
 */
package io.flinkspector.datastream.functions;

import com.google.common.collect.Iterables;
import io.flinkspector.datastream.util.InputUtil;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class ParallelFromStreamRecordsFunction<T>
extends RichParallelSourceFunction<T>
implements ListCheckpointed<Integer> {
    private static final long serialVersionUID = 1L;
    private final TypeSerializer<StreamRecord<T>> serializer;
    private final byte[] elementsSerialized;
    private final int numElements;
    private volatile int numElementsEmitted;
    private volatile int numElementsToSkip;
    private volatile boolean isRunning = true;
    private Boolean flushOpenWindows = false;

    public ParallelFromStreamRecordsFunction(TypeSerializer<StreamRecord<T>> serializer, Iterable<StreamRecord<T>> input) throws IOException {
        this.serializer = serializer;
        this.elementsSerialized = ParallelFromStreamRecordsFunction.serializeOutput(input, serializer).toByteArray();
        this.numElements = Iterables.size(input);
    }

    public ParallelFromStreamRecordsFunction(TypeSerializer<StreamRecord<T>> serializer, Iterable<StreamRecord<T>> input, Boolean flushOpenWindows) throws IOException {
        this(serializer, input);
        this.flushOpenWindows = flushOpenWindows;
    }

    public static <OUT> void checkCollection(Iterable<OUT> elements, Class<OUT> viewedAs) {
        OUT elem;
        Iterator<OUT> i$ = elements.iterator();
        do {
            if (!i$.hasNext()) {
                return;
            }
            elem = i$.next();
            if (elem != null) continue;
            throw new IllegalArgumentException("The collection contains a null element");
        } while (viewedAs.isAssignableFrom(elem.getClass()));
        throw new IllegalArgumentException("The elements in the collection are not all subclasses of " + viewedAs.getCanonicalName());
    }

    private static <T> ByteArrayOutputStream serializeOutput(Iterable<StreamRecord<T>> elements, TypeSerializer<StreamRecord<T>> serializer) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper((OutputStream)new DataOutputStream(baos));
        try {
            for (StreamRecord<T> element : elements) {
                serializer.serialize(element, (DataOutputView)wrapper);
            }
        }
        catch (Exception e) {
            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
        }
        return baos;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
        List outputSplit;
        int numberOfSubTasks = this.getRuntimeContext().getNumberOfParallelSubtasks();
        int indexOfThisSubTask = this.getRuntimeContext().getIndexOfThisSubtask();
        ByteArrayInputStream bais = new ByteArrayInputStream(this.elementsSerialized);
        DataInputViewStreamWrapper input = new DataInputViewStreamWrapper((InputStream)new DataInputStream(bais));
        ArrayList output = new ArrayList();
        for (int i = 0; i < this.numElements; ++i) {
            output.add(this.serializer.deserialize((DataInputView)input));
        }
        if (numberOfSubTasks > 1) {
            if (numberOfSubTasks > output.size()) {
                throw new IllegalStateException("Parallelism of source is higher than the maximum number of parallel sources");
            }
            outputSplit = InputUtil.splitList(output, indexOfThisSubTask, numberOfSubTasks);
        } else {
            outputSplit = output;
        }
        List<Long> watermarks = InputUtil.calculateWatermarks(outputSplit, this.flushOpenWindows);
        this.numElementsEmitted = this.numElementsToSkip;
        Object lock = ctx.getCheckpointLock();
        while (this.isRunning && this.numElementsEmitted < outputSplit.size()) {
            StreamRecord next = outputSplit.get(this.numElementsEmitted);
            Object object = lock;
            synchronized (object) {
                ctx.collectWithTimestamp(next.getValue(), next.getTimestamp());
                Long watermark = watermarks.get(this.numElementsEmitted);
                if (watermark > 0L) {
                    ctx.emitWatermark(new Watermark(watermark.longValue()));
                }
                ++this.numElementsEmitted;
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    public int getNumElements() {
        return this.numElements;
    }

    public int getNumElementsEmitted() {
        return this.numElementsEmitted;
    }

    public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
        return Collections.singletonList(this.numElementsEmitted);
    }

    public void restoreState(List<Integer> state) throws Exception {
        this.numElementsToSkip = state.isEmpty() ? 0 : state.get(0);
    }
}

