package io.flinkspector.datastream.functions;

import io.flinkspector.datastream.util.InputUtil;
import io.flinkspector.shade.com.google.common.collect.Iterables;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:io/flinkspector/datastream/functions/ParallelFromStreamRecordsFunction.class */
public class ParallelFromStreamRecordsFunction<T> extends RichParallelSourceFunction<T> implements ListCheckpointed<Integer> {
    private static final long serialVersionUID = 1;
    private final TypeSerializer<StreamRecord<T>> serializer;
    private final byte[] elementsSerialized;
    private final int numElements;
    private volatile int numElementsEmitted;
    private volatile int numElementsToSkip;
    private volatile boolean isRunning;
    private final Boolean flushOpenWindows;

    public ParallelFromStreamRecordsFunction(TypeSerializer<StreamRecord<T>> typeSerializer, Iterable<StreamRecord<T>> iterable) throws IOException {
        this(typeSerializer, iterable, false);
    }

    public ParallelFromStreamRecordsFunction(TypeSerializer<StreamRecord<T>> typeSerializer, Iterable<StreamRecord<T>> iterable, Boolean bool) throws IOException {
        this.isRunning = true;
        this.serializer = typeSerializer;
        this.elementsSerialized = serializeOutput(iterable, typeSerializer).toByteArray();
        this.numElements = Iterables.size(iterable);
        this.flushOpenWindows = bool;
    }

    public static <OUT> void checkCollection(Iterable<OUT> iterable, Class<OUT> cls) {
        for (OUT out : iterable) {
            if (out == null) {
                throw new IllegalArgumentException("The collection contains a null element");
            }
            if (!cls.isAssignableFrom(out.getClass())) {
                throw new IllegalArgumentException("The elements in the collection are not all subclasses of " + cls.getCanonicalName());
            }
        }
    }

    private static <T> ByteArrayOutputStream serializeOutput(Iterable<StreamRecord<T>> iterable, TypeSerializer<StreamRecord<T>> typeSerializer) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(new DataOutputStream(byteArrayOutputStream));
        try {
            Iterator<StreamRecord<T>> it = iterable.iterator();
            while (it.hasNext()) {
                typeSerializer.serialize(it.next(), dataOutputViewStreamWrapper);
            }
            return byteArrayOutputStream;
        } catch (Exception e) {
            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v40, types: [java.util.List] */
    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        ArrayList arrayList;
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(new DataInputStream(new ByteArrayInputStream(this.elementsSerialized)));
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < this.numElements; i++) {
            arrayList2.add(this.serializer.deserialize(dataInputViewStreamWrapper));
        }
        if (numberOfParallelSubtasks <= 1) {
            arrayList = arrayList2;
        } else {
            if (numberOfParallelSubtasks > arrayList2.size()) {
                throw new IllegalStateException("Parallelism of source is higher than the maximum number of parallel sources");
            }
            arrayList = InputUtil.splitList(arrayList2, indexOfThisSubtask, numberOfParallelSubtasks);
        }
        List<Long> calculateWatermarks = InputUtil.calculateWatermarks(arrayList, this.flushOpenWindows.booleanValue());
        this.numElementsEmitted = this.numElementsToSkip;
        Object checkpointLock = sourceContext.getCheckpointLock();
        while (this.isRunning && this.numElementsEmitted < arrayList.size()) {
            StreamRecord streamRecord = (StreamRecord) arrayList.get(this.numElementsEmitted);
            synchronized (checkpointLock) {
                sourceContext.collectWithTimestamp(streamRecord.getValue(), streamRecord.getTimestamp());
                Long l = calculateWatermarks.get(this.numElementsEmitted);
                if (l.longValue() > 0) {
                    sourceContext.emitWatermark(new Watermark(l.longValue()));
                }
                this.numElementsEmitted++;
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    public int getNumElements() {
        return this.numElements;
    }

    public int getNumElementsEmitted() {
        return this.numElementsEmitted;
    }

    public List<Integer> snapshotState(long j, long j2) throws Exception {
        return Collections.singletonList(Integer.valueOf(this.numElementsEmitted));
    }

    public void restoreState(List<Integer> list) throws Exception {
        this.numElementsToSkip = list.isEmpty() ? 0 : list.get(0).intValue();
    }
}
