package ai.starlake.utils.kafka;

import ai.starlake.config.Settings;
import ai.starlake.schema.model.Mode;
import ai.starlake.schema.model.Mode$;
import ai.starlake.schema.model.Mode$FILE$;
import ai.starlake.schema.model.Mode$STREAM$;
import ai.starlake.utils.FileLock;
import ai.starlake.utils.YamlSerializer$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try$;

/* compiled from: KafkaClient.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\u001db\u0001\u0002\u000f\u001e\u0001\u0019B\u0001b\u0013\u0001\u0003\u0002\u0003\u0006I\u0001\u0014\u0005\t=\u0002\u0011\t\u0011)A\u0006?\")1\r\u0001C\u0001I\"9!\u000e\u0001b\u0001\n\u0003Y\u0007B\u0002;\u0001A\u0003%A\u000eC\u0004v\u0001\t\u0007I\u0011\u0001<\t\u000f\u0005\u0015\u0001\u0001)A\u0005o\"I\u0011q\u0001\u0001C\u0002\u0013\u0005\u0011\u0011\u0002\u0005\t\u0003#\u0001\u0001\u0015!\u0003\u0002\f!I\u00111\u0003\u0001C\u0002\u0013\u0005\u0011Q\u0003\u0005\t\u0003G\u0001\u0001\u0015!\u0003\u0002\u0018!Q\u0011Q\u0005\u0001\t\u0006\u0004%\t!a\n\t\u000f\u0005m\u0002\u0001\"\u0001\u0002>!9\u0011Q\t\u0001\u0005\u0002\u0005\u001d\u0003bBA'\u0001\u0011\u0005\u0011q\n\u0005\b\u0003?\u0002A\u0011AA1\u0011\u001d\t\u0019\t\u0001C\u0001\u0003\u000bCq!!)\u0001\t\u0013\t\u0019\u000bC\u0004\u0002(\u0002!\t!!+\t\u000f\u0005U\u0006\u0001\"\u0003\u00028\"9\u0011\u0011\u0019\u0001\u0005\n\u0005\r\u0007bBAh\u0001\u0011%\u0011\u0011\u001b\u0005\b\u0003+\u0004A\u0011AAl\u0011\u001d\tY\u000e\u0001C\u0001\u0003;Dq!!:\u0001\t\u0003\t9\u000fC\u0004\u0003\u0016\u0001!\tAa\u0006\t\u000f\tu\u0001\u0001\"\u0001\u0003 \tY1*\u00194lC\u000ec\u0017.\u001a8u\u0015\tqr$A\u0003lC\u001a\\\u0017M\u0003\u0002!C\u0005)Q\u000f^5mg*\u0011!eI\u0001\tgR\f'\u000f\\1lK*\tA%\u0001\u0002bS\u000e\u00011#\u0002\u0001([]\u001a\u0005C\u0001\u0015,\u001b\u0005I#\"\u0001\u0016\u0002\u000bM\u001c\u0017\r\\1\n\u00051J#AB!osJ+g\r\u0005\u0002/k5\tqF\u0003\u00021c\u0005a1oY1mC2|wmZ5oO*\u0011!gM\u0001\tif\u0004Xm]1gK*\tA'A\u0002d_6L!AN\u0018\u0003\u001bM#(/[2u\u0019><w-\u001b8h!\tA\u0014)D\u0001:\u0015\tQ4(A\u0002tc2T!\u0001P\u001f\u0002\u000bM\u0004\u0018M]6\u000b\u0005yz\u0014AB1qC\u000eDWMC\u0001A\u0003\ry'oZ\u0005\u0003\u0005f\u0012a\u0002R1uCN,G\u000fT8hO&tw\r\u0005\u0002E\u00136\tQI\u0003\u0002G\u000f\u0006!A.\u00198h\u0015\u0005A\u0015\u0001\u00026bm\u0006L!AS#\u0003\u001b\u0005+Ho\\\"m_N,\u0017M\u00197f\u0003-Y\u0017MZ6b\u0007>tg-[4\u0011\u00055[fB\u0001(Y\u001d\tyeK\u0004\u0002Q+:\u0011\u0011\u000bV\u0007\u0002%*\u00111+J\u0001\u0007yI|w\u000e\u001e \n\u0003\u0011J!AI\u0012\n\u0005]\u000b\u0013AB2p]\u001aLw-\u0003\u0002Z5\u0006A1+\u001a;uS:<7O\u0003\u0002XC%\u0011A,\u0018\u0002\f\u0017\u000647.Y\"p]\u001aLwM\u0003\u0002Z5\u0006A1/\u001a;uS:<7\u000f\u0005\u0002aC6\t!,\u0003\u0002c5\nA1+\u001a;uS:<7/\u0001\u0004=S:LGO\u0010\u000b\u0003K&$\"A\u001a5\u0011\u0005\u001d\u0004Q\"A\u000f\t\u000by\u001b\u00019A0\t\u000b-\u001b\u0001\u0019\u0001'\u0002!\r|W.\u001a;PM\u001a\u001cX\r^:N_\u0012,W#\u00017\u0011\u00055\u0014X\"\u00018\u000b\u0005=\u0004\u0018!B7pI\u0016d'BA9\"\u0003\u0019\u00198\r[3nC&\u00111O\u001c\u0002\u0005\u001b>$W-A\td_6,Go\u00144gg\u0016$8/T8eK\u0002\nQb]3sm\u0016\u0014x\n\u001d;j_:\u001cX#A<\u0011\tadxp \b\u0003sj\u0004\"!U\u0015\n\u0005mL\u0013A\u0002)sK\u0012,g-\u0003\u0002~}\n\u0019Q*\u00199\u000b\u0005mL\u0003c\u0001=\u0002\u0002%\u0019\u00111\u0001@\u0003\rM#(/\u001b8h\u00039\u0019XM\u001d<fe>\u0003H/[8og\u0002\n!cY8nKR|eMZ:fiN\u001cuN\u001c4jOV\u0011\u00111\u0002\t\u0004\u001b\u00065\u0011bAA\b;\n\u00012*\u00194lCR{\u0007/[2D_:4\u0017nZ\u0001\u0014G>lW\r^(gMN,Go]\"p]\u001aLw\rI\u0001\u0006aJ|\u0007o]\u000b\u0003\u0003/\u0001B!!\u0007\u0002 5\u0011\u00111\u0004\u0006\u0004\u0003;9\u0015\u0001B;uS2LA!!\t\u0002\u001c\tQ\u0001K]8qKJ$\u0018.Z:\u0002\rA\u0014x\u000e]:!\u0003\u0019\u0019G.[3oiV\u0011\u0011\u0011\u0006\t\u0005\u0003W\t9$\u0004\u0002\u0002.)!\u0011qFA\u0019\u0003\u0015\tG-\\5o\u0015\u0011\t\u0019$!\u000e\u0002\u000f\rd\u0017.\u001a8ug*\u0011a$P\u0005\u0005\u0003s\tiCA\u0006BI6Lgn\u00117jK:$\u0018!B2m_N,GCAA !\rA\u0013\u0011I\u0005\u0004\u0003\u0007J#\u0001B+oSR\f1\u0002Z3mKR,Gk\u001c9jGR!\u0011qHA%\u0011\u0019\tYE\u0004a\u0001\u007f\u0006IAo\u001c9jG:\u000bW.Z\u0001\u0018GJ,\u0017\r^3U_BL7-\u00134O_R\u0004&/Z:f]R$b!a\u0010\u0002R\u0005m\u0003bBA*\u001f\u0001\u0007\u0011QK\u0001\u0006i>\u0004\u0018n\u0019\t\u0005\u0003W\t9&\u0003\u0003\u0002Z\u00055\"\u0001\u0003(foR{\u0007/[2\t\r\u0005us\u00021\u0001x\u0003\u0011\u0019wN\u001c4\u0002\u001fQ|\u0007/[2QCJ$\u0018\u000e^5p]N$B!a\u0019\u0002\u0002B1\u0011QMA8\u0003krA!a\u001a\u0002l9\u0019\u0011+!\u001b\n\u0003)J1!!\u001c*\u0003\u001d\u0001\u0018mY6bO\u0016LA!!\u001d\u0002t\t!A*[:u\u0015\r\ti'\u000b\t\u0005\u0003o\ni(\u0004\u0002\u0002z)!\u00111PA\u001b\u0003\u0019\u0019w.\\7p]&!\u0011qPA=\u0005I!v\u000e]5d!\u0006\u0014H/\u001b;j_:LeNZ8\t\r\u0005-\u0003\u00031\u0001��\u0003=!x\u000e]5d\u000b:$wJ\u001a4tKR\u001cHCBAD\u00037\u000bi\n\u0005\u0004\u0002f\u0005=\u0014\u0011\u0012\t\bQ\u0005-\u0015qRAK\u0013\r\ti)\u000b\u0002\u0007)V\u0004H.\u001a\u001a\u0011\u0007!\n\t*C\u0002\u0002\u0014&\u00121!\u00138u!\rA\u0013qS\u0005\u0004\u00033K#\u0001\u0002'p]\u001eDa!a\u0013\u0012\u0001\u0004y\bBBAP#\u0001\u0007q/A\u0007bG\u000e,7o](qi&|gn]\u0001\u000bEVLG\u000e\u001a)s_B\u001cH\u0003BA\f\u0003KCa!a(\u0013\u0001\u00049\u0018\u0001\u0005;pa&\u001c7+\u0019<f\u001f\u001a47/\u001a;t)!\ty$a+\u00020\u0006E\u0006BBAW'\u0001\u0007q0A\bu_BL7mQ8oM&<g*Y7f\u0011\u0019\tyj\u0005a\u0001o\"9\u00111W\nA\u0002\u0005\u001d\u0015aB8gMN,Go]\u0001\u001ei>\u0004\u0018nY\"veJ,g\u000e^(gMN,Go\u001d$s_6\u001cFO]3b[R!\u0011\u0011XA`!\u0015A\u00131XAD\u0013\r\ti,\u000b\u0002\u0007\u001fB$\u0018n\u001c8\t\r\u00055F\u00031\u0001��\u0003A\u0019w.\\3u\u001f\u001a47/\u001a;t\u0019>\u001c7\u000e\u0006\u0003\u0002F\u00065\u0007\u0003BAd\u0003\u0013l\u0011aH\u0005\u0004\u0003\u0017|\"\u0001\u0003$jY\u0016dunY6\t\r\u00055V\u00031\u0001��\u0003m!x\u000e]5d\u0007V\u0014(/\u001a8u\u001f\u001a47/\u001a;t\rJ|WNR5mKR!\u0011\u0011XAj\u0011\u0019\tiK\u0006a\u0001\u007f\u0006\u0019Bo\u001c9jG\u000e+(O]3oi>3gm]3ugR!\u0011\u0011XAm\u0011\u0019\tik\u0006a\u0001\u007f\u0006iqN\u001a4tKR\u001c\u0018i\u001d&t_:$b!a8\u0002b\u0006\r\b\u0003\u0002\u0015\u0002<~Da!a\u0013\u0019\u0001\u0004y\bbBAZ1\u0001\u0007\u0011qQ\u0001\u0012G>t7/^7f)>\u0004\u0018n\u0019\"bi\u000eDG\u0003CAu\u0005\u000f\u0011IAa\u0005\u0011\u000f!\nY)a;\u0002\bB!\u0011Q\u001eB\u0001\u001d\u0011\ty/a@\u000f\t\u0005E\u0018Q \b\u0005\u0003g\fYP\u0004\u0003\u0002v\u0006ehbA)\u0002x&\t\u0001)\u0003\u0002?\u007f%\u0011A(P\u0005\u0003umJ1!!\u001c:\u0013\u0011\u0011\u0019A!\u0002\u0003\u0013\u0011\u000bG/\u0019$sC6,'bAA7s!1\u0011QV\rA\u0002}DqAa\u0003\u001a\u0001\u0004\u0011i!A\u0004tKN\u001c\u0018n\u001c8\u0011\u0007a\u0012y!C\u0002\u0003\u0012e\u0012Ab\u00159be.\u001cVm]:j_:DaaV\rA\u0002\u0005-\u0011!F2p]N,X.\u001a+pa&\u001c7\u000b\u001e:fC6Lgn\u001a\u000b\u0007\u0003W\u0014IBa\u0007\t\u000f\t-!\u00041\u0001\u0003\u000e!1qK\u0007a\u0001\u0003\u0017\t1b]5oWR{Gk\u001c9jGR1\u0011q\bB\u0011\u0005GAaaV\u000eA\u0002\u0005-\u0001b\u0002B\u00137\u0001\u0007\u00111^\u0001\u0003I\u001a\u0004")
/* loaded from: input_file:ai/starlake/utils/kafka/KafkaClient.class */
public class KafkaClient implements StrictLogging, DatasetLogging, AutoCloseable {
    private AdminClient client;
    private final Settings settings;
    private final Mode cometOffsetsMode;
    private final Map<String, String> serverOptions;
    private final Settings.KafkaTopicConfig cometOffsetsConfig;
    private final Properties props;
    private final Logger logger;
    private volatile boolean bitmap$0;

    @Override // org.apache.spark.sql.DatasetLogging
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> dataset) {
        DatasetLogging.DatasetHelper<T> DatasetHelper;
        DatasetHelper = DatasetHelper(dataset);
        return DatasetHelper;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public Mode cometOffsetsMode() {
        return this.cometOffsetsMode;
    }

    public Map<String, String> serverOptions() {
        return this.serverOptions;
    }

    public Settings.KafkaTopicConfig cometOffsetsConfig() {
        return this.cometOffsetsConfig;
    }

    public Properties props() {
        return this.props;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [ai.starlake.utils.kafka.KafkaClient] */
    private AdminClient client$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.client = AdminClient.create(props());
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.client;
    }

    public AdminClient client() {
        return !this.bitmap$0 ? client$lzycompute() : this.client;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        client().close();
    }

    public void deleteTopic(String str) {
        boolean contains = ((Set) client().listTopics().names().get()).contains(str);
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info(((TraversableOnce) JavaConverters$.MODULE$.asScalaSetConverter((Set) client().listTopics().names().get()).asScala()).toSet().mkString("\n"));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        if (contains) {
            if (logger().underlying().isInfoEnabled()) {
                logger().underlying().info("Deleting topic {}", new Object[]{str});
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
            client().deleteTopics(JavaConverters$.MODULE$.asJavaCollectionConverter(new $colon.colon(str, Nil$.MODULE$)).asJavaCollection());
        }
    }

    public void createTopicIfNotPresent(NewTopic newTopic, Map<String, String> map) {
        if (((Set) client().listTopics().names().get()).contains(newTopic.name())) {
            return;
        }
        client().createTopics(Collections.singleton(newTopic.configs((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map).asJava()))).all().get();
    }

    public List<TopicPartitionInfo> topicPartitions(String str) {
        return ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(((TopicDescription) ((java.util.Map) client().describeTopics(Collections.singleton(str)).all().get()).get(str)).partitions()).asScala()).toList();
    }

    public List<Tuple2<Object, Object>> topicEndOffsets(String str, Map<String, String> map) {
        Failure apply = Try$.MODULE$.apply(() -> {
            Properties buildProps = this.buildProps(map);
            KafkaConsumer kafkaConsumer = new KafkaConsumer(buildProps);
            if (this.logger().underlying().isInfoEnabled()) {
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("access options for topic {} ==>", new Object[]{str});
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
                ((IterableLike) JavaConverters$.MODULE$.propertiesAsScalaMapConverter(buildProps).asScala()).foreach(tuple2 -> {
                    $anonfun$topicEndOffsets$2(this, tuple2);
                    return BoxedUnit.UNIT;
                });
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
            List list = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(kafkaConsumer.partitionsFor(str)).asScala()).map(partitionInfo -> {
                return new TopicPartition(str, partitionInfo.partition());
            }, Buffer$.MODULE$.canBuildFrom())).toList();
            kafkaConsumer.assign((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(list).asJava());
            kafkaConsumer.seekToEnd((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(list).asJava());
            return (List) list.map(topicPartition -> {
                return new Tuple2.mcIJ.sp(topicPartition.partition(), kafkaConsumer.position(topicPartition));
            }, List$.MODULE$.canBuildFrom());
        });
        if (apply instanceof Failure) {
            Throwable exception = apply.exception();
            exception.printStackTrace();
            throw exception;
        }
        if (apply instanceof Success) {
            return (List) ((Success) apply).value();
        }
        throw new MatchError(apply);
    }

    private Properties buildProps(Map<String, String> map) {
        Properties properties = new Properties();
        map.foreach(tuple2 -> {
            return properties.put(tuple2._1(), tuple2._2());
        });
        return properties;
    }

    public void topicSaveOffsets(String str, Map<String, String> map, List<Tuple2<Object, Object>> list) {
        Mode cometOffsetsMode = cometOffsetsMode();
        if (Mode$STREAM$.MODULE$.equals(cometOffsetsMode)) {
            KafkaProducer kafkaProducer = new KafkaProducer(buildProps(map));
            list.foreach(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                return kafkaProducer.send(new ProducerRecord(this.cometOffsetsConfig().topicName(), new StringBuilder(1).append(str).append("/").append(tuple2._1$mcI$sp()).toString(), String.valueOf(BoxesRunTime.boxToLong(tuple2._2$mcJ$sp()))));
            });
            kafkaProducer.close();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (!Mode$FILE$.MODULE$.equals(cometOffsetsMode)) {
            throw new Exception("Should never happen");
        }
        FileLock cometOffsetsLock = cometOffsetsLock(str);
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromStream(String str) {
        Properties properties = new Properties();
        cometOffsetsConfig().accessOptions().foreach(tuple2 -> {
            return properties.put(tuple2._1(), tuple2._2());
        });
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        List list = (List) topicPartitions(cometOffsetsConfig().topicName()).map(topicPartitionInfo -> {
            return new TopicPartition(this.cometOffsetsConfig().topicName(), topicPartitionInfo.partition());
        }, List$.MODULE$.canBuildFrom());
        kafkaConsumer.assign((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(list).asJava());
        kafkaConsumer.seekToBeginning((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(list).asJava());
        scala.collection.mutable.Map empty = Map$.MODULE$.empty();
        ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(100L));
        while (true) {
            ConsumerRecords consumerRecords = poll;
            if (consumerRecords == null || consumerRecords.isEmpty()) {
                break;
            }
            ((IterableLike) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(consumerRecords.records(cometOffsetsConfig().topicName())).asScala()).foreach(consumerRecord -> {
                return empty.$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(consumerRecord.key()), consumerRecord.value()));
            });
            poll = kafkaConsumer.poll(Duration.ofMillis(100L));
        }
        return ((TraversableLike) empty.keys().map(str2 -> {
            String[] split = new StringOps(Predef$.MODULE$.augmentString(str2)).split('/');
            return new Tuple3(split[0], split[1], empty.apply(str2));
        }, Iterable$.MODULE$.canBuildFrom())).groupBy(tuple3 -> {
            if (tuple3 != null) {
                return (String) tuple3._1();
            }
            throw new MatchError(tuple3);
        }).mapValues(iterable -> {
            return ((TraversableOnce) iterable.map(tuple32 -> {
                if (tuple32 == null) {
                    throw new MatchError(tuple32);
                }
                return new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString((String) tuple32._2())).toInt(), new StringOps(Predef$.MODULE$.augmentString((String) tuple32._3())).toLong());
            }, Iterable$.MODULE$.canBuildFrom())).toList();
        }).get(str);
    }

    private FileLock cometOffsetsLock(String str) {
        return new FileLock(new Path(this.settings.comet().lock().path(), new StringBuilder(19).append("comet_offsets_").append(str).append(".lock").toString()), this.settings.storageHandler());
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromFile(String str) {
        FileLock cometOffsetsLock = cometOffsetsLock(str);
        return (Option) cometOffsetsLock.doExclusively(cometOffsetsLock.doExclusively$default$1(), () -> {
            Path path = new Path(this.cometOffsetsConfig().topicName(), str);
            if (this.settings.storageHandler().exists(path)) {
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Loading comet offsets to path {}", new Object[]{path});
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
                return new Some((List) ((List) YamlSerializer$.MODULE$.mapper().readValue(this.settings.storageHandler().read(path), List.class)).map(str2 -> {
                    String[] split = new StringOps(Predef$.MODULE$.augmentString(str2)).split(',');
                    return new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(split[0])).toInt(), new StringOps(Predef$.MODULE$.augmentString(split[1])).toLong());
                }, List$.MODULE$.canBuildFrom()));
            }
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Cannot load comet offsets: {} file does not exist", new Object[]{path});
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
            return None$.MODULE$;
        });
    }

    public Option<List<Tuple2<Object, Object>>> topicCurrentOffsets(String str) {
        Option<List<Tuple2<Object, Object>>> option;
        Mode cometOffsetsMode = cometOffsetsMode();
        if (Mode$STREAM$.MODULE$.equals(cometOffsetsMode)) {
            option = topicCurrentOffsetsFromStream(str);
        } else {
            if (!Mode$FILE$.MODULE$.equals(cometOffsetsMode)) {
                throw new Exception("Should never happen");
            }
            option = topicCurrentOffsetsFromFile(str);
        }
        return option;
    }

    public Option<String> offsetsAsJson(String str, List<Tuple2<Object, Object>> list) {
        if (list.isEmpty()) {
            return None$.MODULE$;
        }
        return new Some(new StringBuilder(7).append("{\"").append(str).append("\":{").append(((TraversableOnce) list.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            int _1$mcI$sp = tuple2._1$mcI$sp();
            return new StringBuilder(4).append("\"").append(_1$mcI$sp).append("\": ").append(tuple2._2$mcJ$sp()).toString();
        }, List$.MODULE$.canBuildFrom())).mkString(",")).append("}}").toString());
    }

    public Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch(String str, SparkSession sparkSession, Settings.KafkaTopicConfig kafkaTopicConfig) {
        long j = -2;
        List<Tuple2<Object, Object>> list = (List) topicCurrentOffsets(str).getOrElse(() -> {
            return (List) this.topicPartitions(kafkaTopicConfig.topicName()).map(topicPartitionInfo -> {
                return new Tuple2.mcIJ.sp(topicPartitionInfo.partition(), j);
            }, List$.MODULE$.canBuildFrom());
        });
        if (logger().underlying().isInfoEnabled()) {
            list.foreach(tuple2 -> {
                $anonfun$consumeTopicBatch$3(this, str, tuple2);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        List<Tuple2<Object, Object>> list2 = topicEndOffsets(kafkaTopicConfig.topicName(), kafkaTopicConfig.accessOptions());
        if (logger().underlying().isInfoEnabled()) {
            list2.foreach(tuple22 -> {
                $anonfun$consumeTopicBatch$4(this, str, tuple22);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        Dataset selectExpr = sparkSession.read().format("kafka").options(kafkaTopicConfig.accessOptions().$plus$plus(new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("startingOffsets"), offsetsAsJson(kafkaTopicConfig.topicName(), list).getOrElse(() -> {
            return "earliest";
        })), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("endingOffsets"), offsetsAsJson(kafkaTopicConfig.topicName(), list2).getOrElse(() -> {
            return "latest";
        })), Nil$.MODULE$)))).load().selectExpr((Seq) kafkaTopicConfig.fields().map(str2 -> {
            return new StringBuilder(6).append("CAST(").append(str2).append(")").toString();
        }, List$.MODULE$.canBuildFrom()));
        if (!logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
        } else if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info(DatasetHelper(selectExpr).schemaString());
            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
        }
        return new Tuple2<>(selectExpr, list2);
    }

    public Dataset<Row> consumeTopicStreaming(SparkSession sparkSession, Settings.KafkaTopicConfig kafkaTopicConfig) {
        return sparkSession.readStream().format("kafka").options(kafkaTopicConfig.accessOptions()).load().selectExpr((Seq) kafkaTopicConfig.fields().map(str -> {
            return new StringBuilder(6).append("CAST(").append(str).append(")").toString();
        }, List$.MODULE$.canBuildFrom()));
    }

    public void sinkToTopic(Settings.KafkaTopicConfig kafkaTopicConfig, Dataset<Row> dataset) {
        dataset.selectExpr((Seq) kafkaTopicConfig.fields().map(str -> {
            return new StringBuilder(6).append("CAST(").append(str).append(")").toString();
        }, List$.MODULE$.canBuildFrom())).write().format("kafka").options(kafkaTopicConfig.accessOptions()).option("topic", kafkaTopicConfig.topicName()).save();
    }

    public static final /* synthetic */ void $anonfun$topicEndOffsets$2(KafkaClient kafkaClient, Tuple2 tuple2) {
        BoxedUnit boxedUnit;
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String str = (String) tuple2._1();
        String str2 = (String) tuple2._2();
        if (kafkaClient.logger().underlying().isInfoEnabled()) {
            kafkaClient.logger().underlying().info("\t{}={}", new Object[]{str, str2});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$consumeTopicBatch$3(KafkaClient kafkaClient, String str, Tuple2 tuple2) {
        BoxedUnit boxedUnit;
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        int _1$mcI$sp = tuple2._1$mcI$sp();
        long _2$mcJ$sp = tuple2._2$mcJ$sp();
        if (kafkaClient.logger().underlying().isInfoEnabled()) {
            kafkaClient.logger().underlying().info("{} start-offset -> {}:{}", new Object[]{str, BoxesRunTime.boxToInteger(_1$mcI$sp), BoxesRunTime.boxToLong(_2$mcJ$sp)});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$consumeTopicBatch$4(KafkaClient kafkaClient, String str, Tuple2 tuple2) {
        BoxedUnit boxedUnit;
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        int _1$mcI$sp = tuple2._1$mcI$sp();
        long _2$mcJ$sp = tuple2._2$mcJ$sp();
        if (kafkaClient.logger().underlying().isInfoEnabled()) {
            kafkaClient.logger().underlying().info("{} end-offset -> {}:{}", new Object[]{str, BoxesRunTime.boxToInteger(_1$mcI$sp), BoxesRunTime.boxToLong(_2$mcJ$sp)});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public KafkaClient(Settings.KafkaConfig kafkaConfig, Settings settings) {
        BoxedUnit boxToBoolean;
        this.settings = settings;
        StrictLogging.$init$(this);
        DatasetLogging.$init$(this);
        this.cometOffsetsMode = (Mode) settings.comet().kafka().cometOffsetsMode().map(str -> {
            return Mode$.MODULE$.fromString(str);
        }).getOrElse(() -> {
            return Mode$STREAM$.MODULE$;
        });
        this.serverOptions = kafkaConfig.serverOptions();
        this.cometOffsetsConfig = (Settings.KafkaTopicConfig) kafkaConfig.topics().apply("comet_offsets");
        this.props = new Properties();
        serverOptions().foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return this.props().put((String) tuple2._1(), (String) tuple2._2());
        });
        Mode cometOffsetsMode = cometOffsetsMode();
        if (Mode$STREAM$.MODULE$.equals(cometOffsetsMode)) {
            createTopicIfNotPresent(new NewTopic(cometOffsetsConfig().topicName(), cometOffsetsConfig().partitions(), cometOffsetsConfig().replicationFactor()), (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("cleanup.policy"), "compact")})));
            boxToBoolean = BoxedUnit.UNIT;
        } else {
            if (!Mode$FILE$.MODULE$.equals(cometOffsetsMode)) {
                throw new Exception("Should never happen");
            }
            boxToBoolean = !settings.storageHandler().exists(new Path(cometOffsetsConfig().topicName())) ? BoxesRunTime.boxToBoolean(settings.storageHandler().mkdirs(new Path(cometOffsetsConfig().topicName()))) : BoxedUnit.UNIT;
        }
    }
}
