package pl.touk.nussknacker.engine.flink.util.source;

import java.time.Duration;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import pl.touk.nussknacker.engine.api.Context;
import pl.touk.nussknacker.engine.api.context.ValidationContext;
import pl.touk.nussknacker.engine.flink.api.process.BasicFlinkContextInitializer;
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext;
import pl.touk.nussknacker.engine.flink.api.process.FlinkSource;
import scala.Function1;
import scala.collection.Seq;
import scala.collection.immutable.IndexedSeq;
import scala.reflect.ScalaSignature;

/* compiled from: EmitWatermarkAfterEachElementCollectionSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=d\u0001\u0002\b\u0010\u0001yA\u0001\"\u000f\u0001\u0003\u0002\u0003\u0006IA\u000f\u0005\t\r\u0002\u0011\t\u0011)A\u0005\u000f\"AQ\u000b\u0001B\u0002B\u0003-a\u000bC\u0003`\u0001\u0011\u0005\u0001\rC\u0004h\u0001\t\u0007I\u0011\u00025\t\r1\u0004\u0001\u0015!\u0003j\u0011\u001di\u0007A1A\u0005\n9Da\u0001\u001e\u0001!\u0002\u0013y\u0007\"B;\u0001\t\u00032xaBA\u0015\u001f!\u0005\u00111\u0006\u0004\u0007\u001d=A\t!!\f\t\r}[A\u0011AA\u0018\u0011\u001d\t\td\u0003C\u0001\u0003g\u0011Q&R7ji^\u000bG/\u001a:nCJ\\\u0017I\u001a;fe\u0016\u000b7\r[#mK6,g\u000e^\"pY2,7\r^5p]N{WO]2f\u0015\t\u0001\u0012#\u0001\u0004t_V\u00148-\u001a\u0006\u0003%M\tA!\u001e;jY*\u0011A#F\u0001\u0006M2Lgn\u001b\u0006\u0003-]\ta!\u001a8hS:,'B\u0001\r\u001a\u0003-qWo]:l]\u0006\u001c7.\u001a:\u000b\u0005iY\u0012\u0001\u0002;pk.T\u0011\u0001H\u0001\u0003a2\u001c\u0001!\u0006\u0002 aM\u0019\u0001\u0001\t\u0014\u0011\u0005\u0005\"S\"\u0001\u0012\u000b\u0003\r\nQa]2bY\u0006L!!\n\u0012\u0003\r\u0005s\u0017PU3g!\r9CFL\u0007\u0002Q)\u0011\u0011FK\u0001\baJ|7-Z:t\u0015\tY3#A\u0002ba&L!!\f\u0015\u0003\u0017\u0019c\u0017N\\6T_V\u00148-\u001a\t\u0003_Ab\u0001\u0001B\u00032\u0001\t\u0007!GA\u0001U#\t\u0019d\u0007\u0005\u0002\"i%\u0011QG\t\u0002\b\u001d>$\b.\u001b8h!\t\ts'\u0003\u00029E\t\u0019\u0011I\\=\u0002\t1L7\u000f\u001e\t\u0004w\rscB\u0001\u001fB\u001d\ti\u0004)D\u0001?\u0015\tyT$\u0001\u0004=e>|GOP\u0005\u0002G%\u0011!II\u0001\ba\u0006\u001c7.Y4f\u0013\t!UIA\u0002TKFT!A\u0011\u0012\u0002#QLW.Z:uC6\u0004\u0018i]:jO:,'\u000fE\u0002I':j\u0011!\u0013\u0006\u0003\u0015.\u000b\u0011BZ;oGRLwN\\:\u000b\u0005-b%BA'O\u0003%\u0019HO]3b[&twM\u0003\u0002\u0015\u001f*\u0011\u0001+U\u0001\u0007CB\f7\r[3\u000b\u0003I\u000b1a\u001c:h\u0013\t!\u0016J\u0001\u0011BgNLwM\\3s/&$\b\u000eU;oGR,\u0018\r^3e/\u0006$XM]7be.\u001c\u0018AC3wS\u0012,gnY3%cA\u0019q+\u0018\u0018\u000e\u0003aS!!\u0017.\u0002\u0011QL\b/Z5oM>T!a\u0017/\u0002\r\r|W.\\8o\u0015\tYc*\u0003\u0002_1\nyA+\u001f9f\u0013:4wN]7bi&|g.\u0001\u0004=S:LGO\u0010\u000b\u0004C\u00164GC\u00012e!\r\u0019\u0007AL\u0007\u0002\u001f!)Q\u000b\u0002a\u0002-\")\u0011\b\u0002a\u0001u!)a\t\u0002a\u0001\u000f\u0006\u00112m\u001c8uKb$\u0018J\\5uS\u0006d\u0017N_3s+\u0005I\u0007cA\u0014k]%\u00111\u000e\u000b\u0002\u001d\u0005\u0006\u001c\u0018n\u0019$mS:\\7i\u001c8uKb$\u0018J\\5uS\u0006d\u0017N_3s\u0003M\u0019wN\u001c;fqRLe.\u001b;jC2L'0\u001a:!\u0003M1G.\u001b8l'>,(oY3Gk:\u001cG/[8o+\u0005y\u0007c\u00019s]5\t\u0011O\u0003\u0002\u0011\u0013&\u00111/\u001d\u0002\u000f'>,(oY3Gk:\u001cG/[8o\u0003Q1G.\u001b8l'>,(oY3Gk:\u001cG/[8oA\u0005a1o\\;sG\u0016\u001cFO]3b[R)q/a\u0001\u0002\u000eA\u0019\u0001P\u001f?\u000e\u0003eT!aI&\n\u0005mL(A\u0003#bi\u0006\u001cFO]3b[B\u0011Qp`\u0007\u0002}*\u00111&F\u0005\u0004\u0003\u0003q(aB\"p]R,\u0007\u0010\u001e\u0005\b\u0003\u000bI\u0001\u0019AA\u0004\u0003\r)gN\u001e\t\u0004q\u0006%\u0011bAA\u0006s\nQ2\u000b\u001e:fC6,\u00050Z2vi&|g.\u00128wSJ|g.\\3oi\"9\u0011qB\u0005A\u0002\u0005E\u0011\u0001\u00054mS:\\gj\u001c3f\u0007>tG/\u001a=u!\r9\u00131C\u0005\u0004\u0003+A#A\u0006$mS:\\7)^:u_6tu\u000eZ3D_:$X\r\u001f;)\u000b\u0001\tI\"!\n\u0011\t\u0005m\u0011\u0011E\u0007\u0003\u0003;Q1!a\b#\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003G\tiB\u0001\u0004o_^\f'O\\\u0011\u0003\u0003O\tqbY1u{\u0011,\u0007O]3dCRLwN\\\u0001.\u000b6LGoV1uKJl\u0017M]6BMR,'/R1dQ\u0016cW-\\3oi\u000e{G\u000e\\3di&|gnU8ve\u000e,\u0007CA2\f'\tY\u0001\u0005\u0006\u0002\u0002,\u000511M]3bi\u0016,B!!\u000e\u0002>QA\u0011qGA#\u0003\u0017\nY\u0006\u0006\u0003\u0002:\u0005}\u0002\u0003B2\u0001\u0003w\u00012aLA\u001f\t\u0015\tTB1\u00013\u0011%\t\t%DA\u0001\u0002\b\t\u0019%\u0001\u0006fm&$WM\\2fII\u0002BaV/\u0002<!9\u0011qI\u0007A\u0002\u0005%\u0013\u0001C3mK6,g\u000e^:\u0011\tm\u001a\u00151\b\u0005\b\u0003\u001bj\u0001\u0019AA(\u0003M)\u0007\u0010\u001e:bGR$\u0016.\\3ti\u0006l\u0007OR;o!\u001d\t\u0013\u0011KA\u001e\u0003+J1!a\u0015#\u0005%1UO\\2uS>t\u0017\u0007E\u0002\"\u0003/J1!!\u0017#\u0005\u0011auN\\4\t\u000f\u0005uS\u00021\u0001\u0002`\u0005\tR.\u0019=PkR|em\u0014:eKJtWm]:\u0011\t\u0005\u0005\u00141N\u0007\u0003\u0003GRA!!\u001a\u0002h\u0005!A/[7f\u0015\t\tI'\u0001\u0003kCZ\f\u0017\u0002BA7\u0003G\u0012\u0001\u0002R;sCRLwN\u001c")
/* loaded from: input_file:pl/touk/nussknacker/engine/flink/util/source/EmitWatermarkAfterEachElementCollectionSource.class */
public class EmitWatermarkAfterEachElementCollectionSource<T> implements FlinkSource<T> {
    private final TypeInformation<T> evidence$1;
    private final BasicFlinkContextInitializer<T> contextInitializer = new BasicFlinkContextInitializer<>();
    private final SourceFunction<T> flinkSourceFunction;

    public static <T> EmitWatermarkAfterEachElementCollectionSource<T> create(Seq<T> seq, Function1<T, Object> function1, Duration duration, TypeInformation<T> typeInformation) {
        return EmitWatermarkAfterEachElementCollectionSource$.MODULE$.create(seq, function1, duration, typeInformation);
    }

    private BasicFlinkContextInitializer<T> contextInitializer() {
        return this.contextInitializer;
    }

    private SourceFunction<T> flinkSourceFunction() {
        return this.flinkSourceFunction;
    }

    public DataStream<Context> sourceStream(StreamExecutionEnvironment streamExecutionEnvironment, FlinkCustomNodeContext flinkCustomNodeContext) {
        return streamExecutionEnvironment.addSource(flinkSourceFunction(), this.evidence$1).name(new StringBuilder(8).append(flinkCustomNodeContext.metaData().id()).append("-").append(flinkCustomNodeContext.nodeId()).append("-source").toString()).map(contextInitializer().initContext(flinkCustomNodeContext.metaData().id(), flinkCustomNodeContext.nodeId()), flinkCustomNodeContext.typeInformationDetection().forContext((ValidationContext) flinkCustomNodeContext.validationContext().left().get()));
    }

    public EmitWatermarkAfterEachElementCollectionSource(Seq<T> seq, final AssignerWithPunctuatedWatermarks<T> assignerWithPunctuatedWatermarks, TypeInformation<T> typeInformation) {
        this.evidence$1 = typeInformation;
        final IndexedSeq indexedSeq = seq.toIndexedSeq();
        final EmitWatermarkAfterEachElementCollectionSource emitWatermarkAfterEachElementCollectionSource = null;
        this.flinkSourceFunction = new SourceFunction<T>(emitWatermarkAfterEachElementCollectionSource, indexedSeq, assignerWithPunctuatedWatermarks) { // from class: pl.touk.nussknacker.engine.flink.util.source.EmitWatermarkAfterEachElementCollectionSource$$anon$1
            private int toConsumeIndex = 0;
            private volatile boolean isRunning = true;
            private final IndexedSeq seq$1;
            private final AssignerWithPunctuatedWatermarks copyOfAssigner$1;

            private int toConsumeIndex() {
                return this.toConsumeIndex;
            }

            private void toConsumeIndex_$eq(int i) {
                this.toConsumeIndex = i;
            }

            private boolean isRunning() {
                return this.isRunning;
            }

            private void isRunning_$eq(boolean z) {
                this.isRunning = z;
            }

            public void run(SourceFunction.SourceContext<T> sourceContext) {
                while (isRunning() && toConsumeIndex() < this.seq$1.size()) {
                    Object apply = this.seq$1.apply(toConsumeIndex());
                    long extractTimestamp = this.copyOfAssigner$1.extractTimestamp(apply, -1L);
                    sourceContext.collectWithTimestamp(apply, extractTimestamp);
                    sourceContext.emitWatermark(this.copyOfAssigner$1.checkAndGetNextWatermark(apply, extractTimestamp));
                    toConsumeIndex_$eq(toConsumeIndex() + 1);
                }
            }

            public void cancel() {
                isRunning_$eq(false);
            }

            {
                this.seq$1 = indexedSeq;
                this.copyOfAssigner$1 = assignerWithPunctuatedWatermarks;
            }
        };
    }
}
