package io.amient.util.spark;

import io.amient.affinity.core.serde.AbstractSerde;
import io.amient.affinity.core.util.ObjectHashPartitioner;
import io.amient.affinity.kafka.KafkaClient;
import io.amient.affinity.kafka.PayloadAndOffset;
import io.amient.affinity.spark.KafkaSplit;
import java.util.function.Function;
import org.apache.spark.Partition;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.collection.ExternalAppendOnlyMap;
import org.apache.spark.util.collection.ExternalAppendOnlyMap$;
import scala.Function0;
import scala.Function2;
import scala.MatchError;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.IntRef;

/* compiled from: KafkaRDD.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001dg\u0001B\u0001\u0003\u0001-\u0011\u0001bS1gW\u0006\u0014F\t\u0012\u0006\u0003\u0007\u0011\tQa\u001d9be.T!!\u0002\u0004\u0002\tU$\u0018\u000e\u001c\u0006\u0003\u000f!\ta!Y7jK:$(\"A\u0005\u0002\u0005%|7\u0001A\u000b\u0004\u0019\u0001R3C\u0001\u0001\u000e!\rqa\u0003G\u0007\u0002\u001f)\u0011\u0001#E\u0001\u0004e\u0012$'BA\u0002\u0013\u0015\t\u0019B#\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002+\u0005\u0019qN]4\n\u0005]y!a\u0001*E\tB!\u0011\u0004\b\u0010*\u001b\u0005Q\"\"A\u000e\u0002\u000bM\u001c\u0017\r\\1\n\u0005uQ\"A\u0002+va2,'\u0007\u0005\u0002 A1\u0001A!B\u0011\u0001\u0005\u0004\u0011#!A&\u0012\u0005\r2\u0003CA\r%\u0013\t)#DA\u0004O_RD\u0017N\\4\u0011\u0005e9\u0013B\u0001\u0015\u001b\u0005\r\te.\u001f\t\u0003?)\"Qa\u000b\u0001C\u0002\t\u0012\u0011A\u0016\u0005\t[\u0001\u0011\t\u0011)A\u0005]\u0005\u00111o\u0019\t\u0003_Aj\u0011!E\u0005\u0003cE\u0011Ab\u00159be.\u001cuN\u001c;fqRD\u0001b\r\u0001\u0003\u0002\u0003\u0006I\u0001N\u0001\u0007G2LWM\u001c;\u0011\u0005URT\"\u0001\u001c\u000b\u0005]B\u0014!B6bM.\f'BA\u001d\u0007\u0003!\tgMZ5oSRL\u0018BA\u001e7\u0005-Y\u0015MZ6b\u00072LWM\u001c;\t\u0011u\u0002!\u0011!S\u0001\ny\n\u0001b[3z'\u0016\u0014H-\u001a\t\u00043}\n\u0015B\u0001!\u001b\u0005!a$-\u001f8b[\u0016t\u0004G\u0001\"L!\r\u0019\u0005JS\u0007\u0002\t*\u0011QIR\u0001\u0006g\u0016\u0014H-\u001a\u0006\u0003\u000fb\nAaY8sK&\u0011\u0011\n\u0012\u0002\u000e\u0003\n\u001cHO]1diN+'\u000fZ3\u0011\u0005}YE!\u0003'=\u0003\u0003\u0005\tQ!\u0001N\u0005\ryF%M\t\u0003=\u0019B\u0001b\u0014\u0001\u0003\u0002\u0013\u0006I\u0001U\u0001\u000bm\u0006dW/Z*fe\u0012,\u0007cA\r@#B\u0012!\u000b\u0016\t\u0004\u0007\"\u001b\u0006CA\u0010U\t%)f*!A\u0001\u0002\u000b\u0005aKA\u0002`II\n\"!\u000b\u0014\t\u0011a\u0003!\u0011!Q\u0001\ne\u000b\u0011bY8na\u0006\u001cG/\u001a3\u0011\u0005eQ\u0016BA.\u001b\u0005\u001d\u0011un\u001c7fC:D\u0001\"\u0018\u0001\u0003\u0004\u0003\u0006YAX\u0001\u000bKZLG-\u001a8dK\u0012\n\u0004cA0c=5\t\u0001M\u0003\u0002b5\u00059!/\u001a4mK\u000e$\u0018BA2a\u0005!\u0019E.Y:t)\u0006<\u0007\u0002C3\u0001\u0005\u0007\u0005\u000b1\u00024\u0002\u0015\u00154\u0018\u000eZ3oG\u0016$#\u0007E\u0002`E&BQ\u0001\u001b\u0001\u0005\u0002%\fa\u0001P5oSRtDC\u00026paF<X\u0010F\u0002l[:\u0004B\u0001\u001c\u0001\u001fS5\t!\u0001C\u0003^O\u0002\u000fa\fC\u0003fO\u0002\u000fa\rC\u0003.O\u0002\u0007a\u0006C\u00034O\u0002\u0007A\u0007\u0003\u0004>O\u0012\u0005\rA\u001d\t\u00043}\u001a\bG\u0001;w!\r\u0019\u0005*\u001e\t\u0003?Y$\u0011\u0002T9\u0002\u0002\u0003\u0005)\u0011A'\t\r=;G\u00111\u0001y!\rIr(\u001f\u0019\u0003ur\u00042a\u0011%|!\tyB\u0010B\u0005Vo\u0006\u0005\t\u0011!B\u0001-\"9\u0001l\u001aI\u0001\u0002\u0004I\u0006\u0002C@\u0001\u0005\u0004%\t!!\u0001\u0002\u0013\r|W\u000e]1di>\u0014XCAA\u0002!%I\u0012QAA\u0005\u0003\u0013\tI!C\u0002\u0002\bi\u0011\u0011BR;oGRLwN\u001c\u001a\u0011\u0007U\nY!C\u0002\u0002\u000eY\u0012\u0001\u0003U1zY>\fG-\u00118e\u001f\u001a47/\u001a;\t\u0011\u0005E\u0001\u0001)A\u0005\u0003\u0007\t!bY8na\u0006\u001cGo\u001c:!\u0011%\t)\u0002\u0001b\u0001\n\u0003\t9\"A\blC\u001a\\\u0017\rU1si&$\u0018n\u001c8t+\t\tI\u0002\u0005\u0004\u0002\u001c\u0005-\u0012\u0011\u0007\b\u0005\u0003;\t9C\u0004\u0003\u0002 \u0005\u0015RBAA\u0011\u0015\r\t\u0019CC\u0001\u0007yI|w\u000e\u001e \n\u0003mI1!!\u000b\u001b\u0003\u001d\u0001\u0018mY6bO\u0016LA!!\f\u00020\t!A*[:u\u0015\r\tIC\u0007\t\u0005\u0003g\ti$\u0004\u0002\u00026)!\u0011qGA\u001d\u0003\u0011a\u0017M\\4\u000b\u0005\u0005m\u0012\u0001\u00026bm\u0006LA!a\u0010\u00026\t9\u0011J\u001c;fO\u0016\u0014\b\u0002CA\"\u0001\u0001\u0006I!!\u0007\u0002!-\fgm[1QCJ$\u0018\u000e^5p]N\u0004\u0003bBA$\u0001\u0011E\u0011\u0011J\u0001\u000eO\u0016$\b+\u0019:uSRLwN\\:\u0016\u0005\u0005-\u0003#B\r\u0002N\u0005E\u0013bAA(5\t)\u0011I\u001d:bsB\u0019q&a\u0015\n\u0007\u0005U\u0013CA\u0005QCJ$\u0018\u000e^5p]\"9\u0011\u0011\f\u0001\u0005B\u0005m\u0013aB2p[B,H/\u001a\u000b\u0007\u0003;\n\u0019'a\u001a\u0011\u000b\u0005m\u0011q\f\r\n\t\u0005\u0005\u0014q\u0006\u0002\t\u0013R,'/\u0019;pe\"A\u0011QMA,\u0001\u0004\t\t&A\u0003ta2LG\u000f\u0003\u0005\u0002j\u0005]\u0003\u0019AA6\u0003\u001d\u0019wN\u001c;fqR\u00042aLA7\u0013\r\ty'\u0005\u0002\f)\u0006\u001c8nQ8oi\u0016DH\u000fC\u0004\u0002t\u0001!\t!!\u001e\u0002\rU\u0004H-\u0019;f)\u0011\t9(! \u0011\u0007e\tI(C\u0002\u0002|i\u0011A!\u00168ji\"9\u0011qPA9\u0001\u0004i\u0011\u0001\u00023bi\u0006<\u0011\"a!\u0003\u0003\u0003E\t!!\"\u0002\u0011-\u000bgm[1S\t\u0012\u00032\u0001\\AD\r!\t!!!A\t\u0002\u0005%5CBAD\u0003\u0017\u000b\t\nE\u0002\u001a\u0003\u001bK1!a$\u001b\u0005\u0019\te.\u001f*fMB\u0019\u0011$a%\n\u0007\u0005U%D\u0001\u0007TKJL\u0017\r\\5{C\ndW\rC\u0004i\u0003\u000f#\t!!'\u0015\u0005\u0005\u0015\u0005BCAO\u0003\u000f\u000b\n\u0011\"\u0001\u0002 \u0006YB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uIU*b!!)\u00028\u0006eVCAARU\rI\u0016QU\u0016\u0003\u0003O\u0003B!!+\u000246\u0011\u00111\u0016\u0006\u0005\u0003[\u000by+A\u0005v]\u000eDWmY6fI*\u0019\u0011\u0011\u0017\u000e\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u00026\u0006-&!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u00121\u0011%a'C\u0002\t\"aaKAN\u0005\u0004\u0011\u0003BCA_\u0003\u000f\u000b\t\u0011\"\u0003\u0002@\u0006Y!/Z1e%\u0016\u001cx\u000e\u001c<f)\t\t\t\r\u0005\u0003\u00024\u0005\r\u0017\u0002BAc\u0003k\u0011aa\u00142kK\u000e$\b")
/* loaded from: input_file:io/amient/util/spark/KafkaRDD.class */
public class KafkaRDD<K, V> extends RDD<Tuple2<K, V>> {
    public final KafkaClient io$amient$util$spark$KafkaRDD$$client;
    private final Function0<AbstractSerde<? super K>> keySerde;
    private final Function0<AbstractSerde<? super V>> valueSerde;
    private final boolean compacted;
    public final ClassTag<K> io$amient$util$spark$KafkaRDD$$evidence$1;
    public final ClassTag<V> io$amient$util$spark$KafkaRDD$$evidence$2;
    private final Function2<PayloadAndOffset, PayloadAndOffset, PayloadAndOffset> compactor;
    private final List<Integer> kafkaPartitions;

    public Function2<PayloadAndOffset, PayloadAndOffset, PayloadAndOffset> compactor() {
        return this.compactor;
    }

    public List<Integer> kafkaPartitions() {
        return this.kafkaPartitions;
    }

    public Partition[] getPartitions() {
        return (Partition[]) ((TraversableOnce) kafkaPartitions().map(new KafkaRDD$$anonfun$getPartitions$1(this, IntRef.create(-1)), List$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(Partition.class));
    }

    public Iterator<Tuple2<K, V>> compute(Partition partition, TaskContext taskContext) {
        Iterator iterator;
        int partition2 = ((KafkaSplit) partition).partition();
        Tuple2 tuple2 = (Tuple2) ((IterableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(this.io$amient$util$spark$KafkaRDD$$client.getOffsets(partition2)).asScala()).head();
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((Long) tuple2._1(), (Long) tuple2._2());
        java.util.Iterator it = this.io$amient$util$spark$KafkaRDD$$client.iterator(partition2, (Long) tuple22._1(), (Long) tuple22._2());
        AbstractSerde abstractSerde = (AbstractSerde) this.keySerde.apply();
        AbstractSerde abstractSerde2 = (AbstractSerde) this.valueSerde.apply();
        taskContext.addTaskCompletionListener(new KafkaRDD$$anonfun$compute$2(this, it, abstractSerde, abstractSerde2));
        Iterator map = ((Iterator) JavaConverters$.MODULE$.asScalaIteratorConverter(it).asScala()).map(new KafkaRDD$$anonfun$2(this));
        if (this.compacted) {
            ExternalAppendOnlyMap externalAppendOnlyMap = new ExternalAppendOnlyMap(new KafkaRDD$$anonfun$3(this), compactor(), compactor(), ExternalAppendOnlyMap$.MODULE$.$lessinit$greater$default$4(), ExternalAppendOnlyMap$.MODULE$.$lessinit$greater$default$5(), ExternalAppendOnlyMap$.MODULE$.$lessinit$greater$default$6(), ExternalAppendOnlyMap$.MODULE$.$lessinit$greater$default$7());
            externalAppendOnlyMap.insertAll(map);
            iterator = externalAppendOnlyMap.iterator();
        } else {
            iterator = map;
        }
        return iterator.map(new KafkaRDD$$anonfun$compute$3(this, abstractSerde, abstractSerde2)).collect(new KafkaRDD$$anonfun$compute$1(this));
    }

    public void update(RDD<Tuple2<K, V>> rdd) {
        LongAccumulator longAccumulator = new LongAccumulator();
        context().register(longAccumulator);
        log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Producing into ", " ..."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.io$amient$util$spark$KafkaRDD$$client})));
        context().runJob(rdd, new KafkaRDD$$anonfun$update$1(this, longAccumulator), ClassTag$.MODULE$.Unit());
        log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Produced ", " messages into ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{longAccumulator.value(), this.io$amient$util$spark$KafkaRDD$$client})));
    }

    public final void io$amient$util$spark$KafkaRDD$$updatePartition$1(final TaskContext taskContext, Iterator iterator, final LongAccumulator longAccumulator) {
        this.io$amient$util$spark$KafkaRDD$$client.publish((java.util.Iterator) JavaConverters$.MODULE$.asJavaIteratorConverter(iterator.map(new KafkaRDD$$anonfun$4(this, new ObjectHashPartitioner(), (AbstractSerde) this.keySerde.apply(), (AbstractSerde) this.valueSerde.apply()))).asJava(), new Function<Long, Boolean>(this, longAccumulator, taskContext) { // from class: io.amient.util.spark.KafkaRDD$$anon$1
            private final LongAccumulator produced$1;
            private final TaskContext context$1;

            @Override // java.util.function.Function
            public Boolean apply(Long l) {
                this.produced$1.add(l);
                return Predef$.MODULE$.boolean2Boolean(!this.context$1.isInterrupted());
            }

            {
                this.produced$1 = longAccumulator;
                this.context$1 = taskContext;
            }
        });
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KafkaRDD(SparkContext sparkContext, KafkaClient kafkaClient, Function0<AbstractSerde<? super K>> function0, Function0<AbstractSerde<? super V>> function02, boolean z, ClassTag<K> classTag, ClassTag<V> classTag2) {
        super(sparkContext, Nil$.MODULE$, ClassTag$.MODULE$.apply(Tuple2.class));
        this.io$amient$util$spark$KafkaRDD$$client = kafkaClient;
        this.keySerde = function0;
        this.valueSerde = function02;
        this.compacted = z;
        this.io$amient$util$spark$KafkaRDD$$evidence$1 = classTag;
        this.io$amient$util$spark$KafkaRDD$$evidence$2 = classTag2;
        this.compactor = new KafkaRDD$$anonfun$1(this);
        this.kafkaPartitions = ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(kafkaClient.getPartitions()).asScala()).toList();
    }
}
