/*
 * Decompiled with CFR 0.152.
 */
package ai.starlake.utils.kafka;

import ai.starlake.config.Settings;
import ai.starlake.schema.model.Mode;
import ai.starlake.schema.model.Mode$;
import ai.starlake.schema.model.Mode$FILE$;
import ai.starlake.schema.model.Mode$STREAM$;
import ai.starlake.utils.FileLock;
import ai.starlake.utils.YamlSerializer$;
import ai.starlake.utils.kafka.KafkaClient$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.DatasetLogging$class;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

@ScalaSignature(bytes="\u0006\u0001\tMc\u0001B\u0001\u0003\u0001-\u00111bS1gW\u0006\u001cE.[3oi*\u00111\u0001B\u0001\u0006W\u000647.\u0019\u0006\u0003\u000b\u0019\tQ!\u001e;jYNT!a\u0002\u0005\u0002\u0011M$\u0018M\u001d7bW\u0016T\u0011!C\u0001\u0003C&\u001c\u0001aE\u0003\u0001\u0019Ia\u0002\u0006\u0005\u0002\u000e!5\taBC\u0001\u0010\u0003\u0015\u00198-\u00197b\u0013\t\tbB\u0001\u0004B]f\u0014VM\u001a\t\u0003'ii\u0011\u0001\u0006\u0006\u0003+Y\tAb]2bY\u0006dwnZ4j]\u001eT!a\u0006\r\u0002\u0011QL\b/Z:bM\u0016T\u0011!G\u0001\u0004G>l\u0017BA\u000e\u0015\u00055\u0019FO]5di2{wmZ5oOB\u0011QDJ\u0007\u0002=)\u0011q\u0004I\u0001\u0004gFd'BA\u0011#\u0003\u0015\u0019\b/\u0019:l\u0015\t\u0019C%\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002K\u0005\u0019qN]4\n\u0005\u001dr\"A\u0004#bi\u0006\u001cX\r\u001e'pO\u001eLgn\u001a\t\u0003S9j\u0011A\u000b\u0006\u0003W1\nA\u0001\\1oO*\tQ&\u0001\u0003kCZ\f\u0017BA\u0018+\u00055\tU\u000f^8DY>\u001cX-\u00192mK\"A\u0011\u0007\u0001B\u0001B\u0003%!'A\u0006lC\u001a\\\u0017mQ8oM&<\u0007CA\u001aB\u001d\t!dH\u0004\u00026y9\u0011ag\u000f\b\u0003oij\u0011\u0001\u000f\u0006\u0003s)\ta\u0001\u0010:p_Rt\u0014\"A\u0005\n\u0005\u001dA\u0011BA\u001f\u0007\u0003\u0019\u0019wN\u001c4jO&\u0011q\bQ\u0001\t'\u0016$H/\u001b8hg*\u0011QHB\u0005\u0003\u0005\u000e\u00131bS1gW\u0006\u001cuN\u001c4jO*\u0011q\b\u0011\u0005\t\u000b\u0002\u0011\t\u0011)A\u0006\r\u0006A1/\u001a;uS:<7\u000f\u0005\u0002H\u00116\t\u0001)\u0003\u0002J\u0001\nA1+\u001a;uS:<7\u000fC\u0003L\u0001\u0011\u0005A*\u0001\u0004=S:LGO\u0010\u000b\u0003\u001bF#\"A\u0014)\u0011\u0005=\u0003Q\"\u0001\u0002\t\u000b\u0015S\u00059\u0001$\t\u000bER\u0005\u0019\u0001\u001a\t\u000fM\u0003!\u0019!C\u0001)\u0006\u00012m\\7fi>3gm]3ug6{G-Z\u000b\u0002+B\u0011akW\u0007\u0002/*\u0011\u0001,W\u0001\u0006[>$W\r\u001c\u0006\u00035\u001a\taa]2iK6\f\u0017B\u0001/X\u0005\u0011iu\u000eZ3\t\ry\u0003\u0001\u0015!\u0003V\u0003E\u0019w.\\3u\u001f\u001a47/\u001a;t\u001b>$W\r\t\u0005\bA\u0002\u0011\r\u0011\"\u0001b\u00035\u0019XM\u001d<fe>\u0003H/[8ogV\t!\r\u0005\u0003dM&LgBA\u0007e\u0013\t)g\"\u0001\u0004Qe\u0016$WMZ\u0005\u0003O\"\u00141!T1q\u0015\t)g\u0002\u0005\u0002dU&\u00111\u000e\u001b\u0002\u0007'R\u0014\u0018N\\4\t\r5\u0004\u0001\u0015!\u0003c\u00039\u0019XM\u001d<fe>\u0003H/[8og\u0002Bqa\u001c\u0001C\u0002\u0013\u0005\u0001/\u0001\nd_6,Go\u00144gg\u0016$8oQ8oM&<W#A9\u0011\u0005M\u0012\u0018BA:D\u0005AY\u0015MZ6b)>\u0004\u0018nY\"p]\u001aLw\r\u0003\u0004v\u0001\u0001\u0006I!]\u0001\u0014G>lW\r^(gMN,Go]\"p]\u001aLw\r\t\u0005\bo\u0002\u0011\r\u0011\"\u0001y\u0003]\u0019XM\u001d<fe>\u0003H/[8ogB\u0013x\u000e]3si&,7/F\u0001z!\tQX0D\u0001|\u0015\taH&\u0001\u0003vi&d\u0017B\u0001@|\u0005)\u0001&o\u001c9feRLWm\u001d\u0005\b\u0003\u0003\u0001\u0001\u0015!\u0003z\u0003a\u0019XM\u001d<fe>\u0003H/[8ogB\u0013x\u000e]3si&,7\u000f\t\u0005\u000b\u0003\u000b\u0001\u0001R1A\u0005\u0002\u0005\u001d\u0011AB2mS\u0016tG/\u0006\u0002\u0002\nA!\u00111BA\f\u001b\t\tiA\u0003\u0003\u0002\u0010\u0005E\u0011!B1e[&t'\u0002BA\n\u0003+\tqa\u00197jK:$8O\u0003\u0002\u0004E%!\u0011\u0011DA\u0007\u0005-\tE-\\5o\u00072LWM\u001c;\t\u0015\u0005u\u0001\u0001#A!B\u0013\tI!A\u0004dY&,g\u000e\u001e\u0011\t\u000f\u0005\u0005\u0002\u0001\"\u0001\u0002$\u0005)1\r\\8tKR\u0011\u0011Q\u0005\t\u0004\u001b\u0005\u001d\u0012bAA\u0015\u001d\t!QK\\5u\u0011\u001d\ti\u0003\u0001C\u0001\u0003_\t1\u0002Z3mKR,Gk\u001c9jGR!\u0011QEA\u0019\u0011\u001d\t\u0019$a\u000bA\u0002%\f\u0011\u0002^8qS\u000et\u0015-\\3\t\u000f\u0005]\u0002\u0001\"\u0001\u0002:\u000592M]3bi\u0016$v\u000e]5d\u0013\u001atu\u000e\u001e)sKN,g\u000e\u001e\u000b\u0007\u0003K\tY$!\u0012\t\u0011\u0005u\u0012Q\u0007a\u0001\u0003\u007f\tQ\u0001^8qS\u000e\u0004B!a\u0003\u0002B%!\u00111IA\u0007\u0005!qUm\u001e+pa&\u001c\u0007bBA$\u0003k\u0001\rAY\u0001\u0005G>tg\rC\u0004\u0002L\u0001!\t!!\u0014\u0002\u001fQ|\u0007/[2F]\u0012|eMZ:fiN$b!a\u0014\u0002t\u0005U\u0004CBA)\u00037\n\tG\u0004\u0003\u0002T\u0005]cbA\u001c\u0002V%\tq\"C\u0002\u0002Z9\tq\u0001]1dW\u0006<W-\u0003\u0003\u0002^\u0005}#\u0001\u0002'jgRT1!!\u0017\u000f!\u001di\u00111MA4\u0003[J1!!\u001a\u000f\u0005\u0019!V\u000f\u001d7feA\u0019Q\"!\u001b\n\u0007\u0005-dBA\u0002J]R\u00042!DA8\u0013\r\t\tH\u0004\u0002\u0005\u0019>tw\rC\u0004\u00024\u0005%\u0003\u0019A5\t\u000f\u0005]\u0014\u0011\na\u0001E\u0006i\u0011mY2fgN|\u0005\u000f^5p]NDq!a\u001f\u0001\t\u0013\ti(A\tfqR\u0014\u0018m\u0019;QCJ$\u0018\u000e^5p]N$b!a \u0002\u0010\u0006E\u0005CBAA\u00037\n\u0019ID\u0002\u000e\u0003/\u0002B!!\"\u0002\f6\u0011\u0011q\u0011\u0006\u0005\u0003\u0013\u000b)\"\u0001\u0004d_6lwN\\\u0005\u0005\u0003\u001b\u000b9I\u0001\bU_BL7\rU1si&$\u0018n\u001c8\t\u000f\u0005M\u0012\u0011\u0010a\u0001S\"A\u00111SA=\u0001\u0004\t)*\u0001\u0005d_:\u001cX/\\3s!\u0019\t9*a'jS6\u0011\u0011\u0011\u0014\u0006\u0005\u0003'\u000b\t\"\u0003\u0003\u0002\u001e\u0006e%!D&bM.\f7i\u001c8tk6,'\u000fC\u0004\u0002\"\u0002!I!a)\u0002\u00179,woQ8ogVlWM\u001d\u000b\u0007\u0003+\u000b)+a*\t\u000f\u0005M\u0012q\u0014a\u0001S\"9\u0011qOAP\u0001\u0004\u0011\u0007bBAV\u0001\u0011%\u0011QV\u0001\u000bEVLG\u000e\u001a)s_B\u001cHcA=\u00020\"9\u0011qOAU\u0001\u0004\u0011\u0007bBAZ\u0001\u0011\u0005\u0011QW\u0001\u0011i>\u0004\u0018nY*bm\u0016|eMZ:fiN$\u0002\"!\n\u00028\u0006m\u0016Q\u0018\u0005\b\u0003s\u000b\t\f1\u0001j\u0003=!x\u000e]5d\u0007>tg-[4OC6,\u0007bBA<\u0003c\u0003\rA\u0019\u0005\t\u0003\u007f\u000b\t\f1\u0001\u0002P\u00059qN\u001a4tKR\u001c\bbBAb\u0001\u0011\u0005\u0011QY\u0001\u0015C\u0012l\u0017N\u001c+pa&\u001c\u0007+\u0019:uSRLwN\\:\u0015\t\u0005\u001d\u0017q\u001a\t\u0007\u0003#\nY&!3\u0011\t\u0005\u0015\u00151Z\u0005\u0005\u0003\u001b\f9I\u0001\nU_BL7\rU1si&$\u0018n\u001c8J]\u001a|\u0007bBA\u001a\u0003\u0003\u0004\r!\u001b\u0005\b\u0003'\u0004A\u0011BAk\u0003u!x\u000e]5d\u0007V\u0014(/\u001a8u\u001f\u001a47/\u001a;t\rJ|Wn\u0015;sK\u0006lG\u0003BAl\u0003;\u0004R!DAm\u0003\u001fJ1!a7\u000f\u0005\u0019y\u0005\u000f^5p]\"9\u0011\u0011XAi\u0001\u0004I\u0007bBAq\u0001\u0011%\u00111]\u0001\u0011G>lW\r^(gMN,Go\u001d'pG.$B!!:\u0002nB!\u0011q]Au\u001b\u0005!\u0011bAAv\t\tAa)\u001b7f\u0019>\u001c7\u000eC\u0004\u0002:\u0006}\u0007\u0019A5\t\u000f\u0005E\b\u0001\"\u0003\u0002t\u0006YBo\u001c9jG\u000e+(O]3oi>3gm]3ug\u001a\u0013x.\u001c$jY\u0016$B!a6\u0002v\"9\u0011\u0011XAx\u0001\u0004I\u0007bBA}\u0001\u0011\u0005\u00111`\u0001\u0014i>\u0004\u0018nY\"veJ,g\u000e^(gMN,Go\u001d\u000b\u0005\u0003/\fi\u0010C\u0004\u0002:\u0006]\b\u0019A5\t\u000f\t\u0005\u0001\u0001\"\u0001\u0003\u0004\u0005iqN\u001a4tKR\u001c\u0018i\u001d&t_:$bA!\u0002\u0003\b\t%\u0001\u0003B\u0007\u0002Z&Dq!a\r\u0002\u0000\u0002\u0007\u0011\u000e\u0003\u0005\u0002@\u0006}\b\u0019AA(\u0011\u001d\u0011i\u0001\u0001C\u0001\u0005\u001f\t\u0011cY8ogVlW\rV8qS\u000e\u0014\u0015\r^2i)!\u0011\tBa\f\u00032\tm\u0002cB\u0007\u0002d\tM\u0011q\n\t\u0005\u0005+\u0011IC\u0004\u0003\u0003\u0018\t\u001db\u0002\u0002B\r\u0005KqAAa\u0007\u0003$9!!Q\u0004B\u0011\u001d\r9$qD\u0005\u0002K%\u00111\u0005J\u0005\u0003C\tJ!a\b\u0011\n\u0007\u0005ec$\u0003\u0003\u0003,\t5\"!\u0003#bi\u00064%/Y7f\u0015\r\tIF\b\u0005\b\u0003s\u0013Y\u00011\u0001j\u0011!\u0011\u0019Da\u0003A\u0002\tU\u0012aB:fgNLwN\u001c\t\u0004;\t]\u0012b\u0001B\u001d=\ta1\u000b]1sWN+7o]5p]\"1QHa\u0003A\u0002EDqAa\u0010\u0001\t\u0003\u0011\t%A\u000bd_:\u001cX/\\3U_BL7m\u0015;sK\u0006l\u0017N\\4\u0015\r\tM!1\tB#\u0011!\u0011\u0019D!\u0010A\u0002\tU\u0002BB\u001f\u0003>\u0001\u0007\u0011\u000fC\u0004\u0003J\u0001!\tAa\u0013\u0002\u0017MLgn\u001b+p)>\u0004\u0018n\u0019\u000b\u0007\u0003K\u0011iEa\u0014\t\ru\u00129\u00051\u0001r\u0011!\u0011\tFa\u0012A\u0002\tM\u0011A\u00013g\u0001")
public class KafkaClient
implements StrictLogging,
DatasetLogging,
AutoCloseable {
    public final Settings ai$starlake$utils$kafka$KafkaClient$$settings;
    private final Mode cometOffsetsMode;
    private final scala.collection.immutable.Map<String, String> serverOptions;
    private final Settings.KafkaTopicConfig cometOffsetsConfig;
    private final Properties serverOptionsProperties;
    private AdminClient client;
    private final Logger logger;
    private volatile boolean bitmap$0;

    private AdminClient client$lzycompute() {
        KafkaClient kafkaClient = this;
        synchronized (kafkaClient) {
            if (!this.bitmap$0) {
                this.client = AdminClient.create((Properties)this.serverOptionsProperties());
                this.bitmap$0 = true;
            }
            return this.client;
        }
    }

    @Override
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> ds) {
        return DatasetLogging$class.DatasetHelper(this, ds);
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    public Mode cometOffsetsMode() {
        return this.cometOffsetsMode;
    }

    public scala.collection.immutable.Map<String, String> serverOptions() {
        return this.serverOptions;
    }

    public Settings.KafkaTopicConfig cometOffsetsConfig() {
        return this.cometOffsetsConfig;
    }

    public Properties serverOptionsProperties() {
        return this.serverOptionsProperties;
    }

    public AdminClient client() {
        return this.bitmap$0 ? this.client : this.client$lzycompute();
    }

    @Override
    public void close() {
        this.client().close();
    }

    public void deleteTopic(String topicName) {
        BoxedUnit boxedUnit;
        boolean found = ((Set)this.client().listTopics().names().get()).contains(topicName);
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(((TraversableOnce)JavaConverters$.MODULE$.asScalaSetConverter((Set)this.client().listTopics().names().get()).asScala()).toSet().mkString("\n"));
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        if (found) {
            BoxedUnit boxedUnit2;
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Deleting topic {}", new Object[]{topicName});
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            this.client().deleteTopics(JavaConverters$.MODULE$.asJavaCollectionConverter((Iterable)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topicName}))).asJavaCollection());
        }
    }

    public void createTopicIfNotPresent(NewTopic topic, scala.collection.immutable.Map<String, String> conf) {
        boolean found = ((Set)this.client().listTopics().names().get()).contains(topic.name());
        if (!found) {
            this.client().createTopics(Collections.singleton(topic.configs((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(conf).asJava()))).all().get();
        }
    }

    public List<Tuple2<Object, Object>> topicEndOffsets(String topicName, scala.collection.immutable.Map<String, String> accessOptions) {
        Try try_ = Try$.MODULE$.apply((Function0)new Serializable(this, topicName, accessOptions){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ KafkaClient $outer;
            private final String topicName$1;
            private final scala.collection.immutable.Map accessOptions$1;

            /*
             * WARNING - void declaration
             */
            public final List<Tuple2<Object, Object>> apply() {
                void var3_3;
                KafkaConsumer<String, String> consumer = this.$outer.ai$starlake$utils$kafka$KafkaClient$$newConsumer(this.topicName$1, (scala.collection.immutable.Map<String, String>)this.accessOptions$1);
                List<TopicPartition> partitions = this.$outer.ai$starlake$utils$kafka$KafkaClient$$extractPartitions(this.topicName$1, consumer);
                consumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
                consumer.seekToEnd((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
                List result = (List)partitions.map((Function1)new Serializable(this, consumer){
                    public static final long serialVersionUID = 0L;
                    private final KafkaConsumer consumer$1;

                    public final Tuple2<Object, Object> apply(TopicPartition p) {
                        return new Tuple2.mcIJ.sp(p.partition(), this.consumer$1.position(p));
                    }
                    {
                        this.consumer$1 = consumer$1;
                    }
                }, List$.MODULE$.canBuildFrom());
                consumer.close();
                return var3_3;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.topicName$1 = topicName$1;
                this.accessOptions$1 = accessOptions$1;
            }
        });
        if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable e = failure.exception();
            e.printStackTrace();
            throw e;
        }
        if (try_ instanceof Success) {
            List value;
            Success success = (Success)try_;
            List list = value = (List)success.value();
            return list;
        }
        throw new MatchError((Object)try_);
    }

    /*
     * WARNING - void declaration
     */
    public List<TopicPartition> ai$starlake$utils$kafka$KafkaClient$$extractPartitions(String topicName, KafkaConsumer<String, String> consumer) {
        void var3_3;
        List partitions = ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(consumer.partitionsFor(topicName)).asScala()).map((Function1)new Serializable(this, topicName){
            public static final long serialVersionUID = 0L;
            private final String topicName$2;

            public final TopicPartition apply(PartitionInfo info) {
                return new TopicPartition(this.topicName$2, info.partition());
            }
            {
                this.topicName$2 = topicName$2;
            }
        }, Buffer$.MODULE$.canBuildFrom())).toList();
        return var3_3;
    }

    public KafkaConsumer<String, String> ai$starlake$utils$kafka$KafkaClient$$newConsumer(String topicName, scala.collection.immutable.Map<String, String> accessOptions) {
        BoxedUnit boxedUnit;
        Properties props = this.buildProps(accessOptions);
        if (this.logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit2;
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("access options for topic {} ==>", new Object[]{topicName});
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            ((IterableLike)JavaConverters$.MODULE$.propertiesAsScalaMapConverter(props).asScala()).foreach((Function1)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ KafkaClient $outer;

                public final void apply(Tuple2<String, String> x0$2) {
                    Tuple2<String, String> tuple2 = x0$2;
                    if (tuple2 != null) {
                        BoxedUnit boxedUnit;
                        String k = (String)tuple2._1();
                        String v = (String)tuple2._2();
                        if (this.$outer.logger().underlying().isInfoEnabled()) {
                            this.$outer.logger().underlying().info("\t{}={}", new Object[]{k, v});
                            boxedUnit = BoxedUnit.UNIT;
                        } else {
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        BoxedUnit boxedUnit2 = boxedUnit;
                        return;
                    }
                    throw new MatchError(tuple2);
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                }
            });
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        KafkaConsumer consumer = new KafkaConsumer(props);
        return consumer;
    }

    /*
     * WARNING - void declaration
     */
    private Properties buildProps(scala.collection.immutable.Map<String, String> accessOptions) {
        void var2_2;
        Properties props = new Properties();
        accessOptions.foreach((Function1)new Serializable(this, props){
            public static final long serialVersionUID = 0L;
            private final Properties props$1;

            public final Object apply(Tuple2<String, String> option2) {
                return this.props$1.put(option2._1(), option2._2());
            }
            {
                this.props$1 = props$1;
            }
        });
        return var2_2;
    }

    public void topicSaveOffsets(String topicConfigName, scala.collection.immutable.Map<String, String> accessOptions, List<Tuple2<Object, Object>> offsets) {
        block4: {
            block3: {
                Mode mode;
                block2: {
                    mode = this.cometOffsetsMode();
                    if (!((Object)Mode$STREAM$.MODULE$).equals(mode)) break block2;
                    Properties props = this.buildProps(accessOptions);
                    KafkaProducer producer = new KafkaProducer(props);
                    offsets.foreach((Function1)new Serializable(this, topicConfigName, producer){
                        public static final long serialVersionUID = 0L;
                        private final /* synthetic */ KafkaClient $outer;
                        private final String topicConfigName$2;
                        private final KafkaProducer producer$1;

                        public final Future<RecordMetadata> apply(Tuple2<Object, Object> x0$3) {
                            Tuple2<Object, Object> tuple2 = x0$3;
                            if (tuple2 != null) {
                                int partition = tuple2._1$mcI$sp();
                                long offset = tuple2._2$mcJ$sp();
                                Future future = this.producer$1.send(new ProducerRecord(this.$outer.cometOffsetsConfig().topicName(), (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.topicConfigName$2, BoxesRunTime.boxToInteger((int)partition)})), (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)offset)}))));
                                return future;
                            }
                            throw new MatchError(tuple2);
                        }
                        {
                            if ($outer == null) {
                                throw null;
                            }
                            this.$outer = $outer;
                            this.topicConfigName$2 = topicConfigName$2;
                            this.producer$1 = producer$1;
                        }
                    });
                    producer.close();
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    break block3;
                }
                if (!((Object)Mode$FILE$.MODULE$).equals(mode)) break block4;
                FileLock qual$1 = this.cometOffsetsLock(topicConfigName);
                long x$2 = qual$1.doExclusively$default$1();
                Serializable x$3 = new Serializable(this, topicConfigName, offsets){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ KafkaClient $outer;
                    private final String topicConfigName$2;
                    private final List offsets$1;

                    public final void apply() {
                        this.apply$mcV$sp();
                    }

                    public void apply$mcV$sp() {
                        BoxedUnit boxedUnit;
                        Path cometOffsetsPath = new Path(this.$outer.cometOffsetsConfig().topicName(), this.topicConfigName$2);
                        if (this.$outer.logger().underlying().isInfoEnabled()) {
                            this.$outer.logger().underlying().info("Saving comet offsets to path {}", new Object[]{cometOffsetsPath});
                            boxedUnit = BoxedUnit.UNIT;
                        } else {
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        this.$outer.ai$starlake$utils$kafka$KafkaClient$$settings.storageHandler().write(YamlSerializer$.MODULE$.serializeObject(this.offsets$1.map((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final String apply(Tuple2<Object, Object> x0$4) {
                                Tuple2<Object, Object> tuple2 = x0$4;
                                if (tuple2 != null) {
                                    int partition = tuple2._1$mcI$sp();
                                    long offset = tuple2._2$mcJ$sp();
                                    String string = new StringBuilder().append((Object)((Object)BoxesRunTime.boxToInteger((int)partition)).toString()).append((Object)",").append((Object)((Object)BoxesRunTime.boxToLong((long)offset)).toString()).toString();
                                    return string;
                                }
                                throw new MatchError(tuple2);
                            }
                        }, List$.MODULE$.canBuildFrom())), cometOffsetsPath);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.topicConfigName$2 = topicConfigName$2;
                        this.offsets$1 = offsets$1;
                    }
                };
                BoxedUnit boxedUnit = (BoxedUnit)qual$1.doExclusively(x$2, x$3);
            }
            return;
        }
        throw new Exception("Should never happen");
    }

    public List<TopicPartitionInfo> adminTopicPartitions(String topicName) {
        return ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(((TopicDescription)((java.util.Map)this.client().describeTopics(Collections.singleton(topicName)).allTopicNames().get()).get(topicName)).partitions()).asScala()).toList();
    }

    /*
     * WARNING - void declaration
     */
    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromStream(String topicConfigName) {
        Properties props = new Properties();
        this.cometOffsetsConfig().allAccessOptions(this.ai$starlake$utils$kafka$KafkaClient$$settings.comet().kafka().sparkServerOptions()).foreach((Function1)new Serializable(this, props){
            public static final long serialVersionUID = 0L;
            private final Properties props$2;

            public final Object apply(Tuple2<String, String> option2) {
                return this.props$2.put(option2._1(), option2._2());
            }
            {
                this.props$2 = props$2;
            }
        });
        KafkaConsumer consumer = new KafkaConsumer(props);
        List<TopicPartition> partitions = this.ai$starlake$utils$kafka$KafkaClient$$extractPartitions(this.cometOffsetsConfig().topicName(), (KafkaConsumer<String, String>)consumer);
        consumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
        consumer.seekToBeginning((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
        scala.collection.mutable.Map offsets = Map$.MODULE$.empty();
        ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
        while (true) {
            void var3_3;
            void var5_5;
            ConsumerRecords consumerRecords;
            if (records == null || records.isEmpty()) {
                Option res = ((TraversableLike)offsets.keys().map((Function1)new Serializable(this, offsets){
                    public static final long serialVersionUID = 0L;
                    private final scala.collection.mutable.Map offsets$2;

                    public final Tuple3<String, String, String> apply(String k) {
                        String[] tab = new StringOps(Predef$.MODULE$.augmentString(k)).split('/');
                        return new Tuple3((Object)tab[0], (Object)tab[1], this.offsets$2.apply((Object)k));
                    }
                    {
                        this.offsets$2 = offsets$2;
                    }
                }, Iterable$.MODULE$.canBuildFrom())).groupBy((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply(Tuple3<String, String, String> x0$5) {
                        Tuple3<String, String, String> tuple3 = x0$5;
                        if (tuple3 != null) {
                            String topic;
                            String string = topic = (String)tuple3._1();
                            return string;
                        }
                        throw new MatchError(tuple3);
                    }
                }).mapValues((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final List<Tuple2<Object, Object>> apply(Iterable<Tuple3<String, String, String>> x$1) {
                        return ((TraversableOnce)x$1.map((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final Tuple2<Object, Object> apply(Tuple3<String, String, String> x0$6) {
                                Tuple3<String, String, String> tuple3 = x0$6;
                                if (tuple3 != null) {
                                    String partition = (String)tuple3._2();
                                    String offset = (String)tuple3._3();
                                    Tuple2.mcIJ.sp sp2 = new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(partition)).toInt(), new StringOps(Predef$.MODULE$.augmentString(offset)).toLong());
                                    return sp2;
                                }
                                throw new MatchError(tuple3);
                            }
                        }, Iterable$.MODULE$.canBuildFrom())).toList();
                    }
                }).get((Object)topicConfigName);
                return res;
            }
            ((IterableLike)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(consumerRecords.records(this.cometOffsetsConfig().topicName())).asScala()).foreach((Function1)new Serializable(this, (scala.collection.mutable.Map)var5_5){
                public static final long serialVersionUID = 0L;
                private final scala.collection.mutable.Map offsets$2;

                public final scala.collection.mutable.Map<String, String> apply(ConsumerRecord<String, String> r) {
                    return (scala.collection.mutable.Map)this.offsets$2.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(r.key()), r.value()));
                }
                {
                    this.offsets$2 = offsets$2;
                }
            });
            consumerRecords = var3_3.poll(Duration.ofMillis(100L));
        }
    }

    private FileLock cometOffsetsLock(String topicConfigName) {
        Path lockPath = new Path(this.ai$starlake$utils$kafka$KafkaClient$$settings.comet().lock().path(), new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"comet_offsets_", ".lock"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicConfigName})));
        return new FileLock(lockPath, this.ai$starlake$utils$kafka$KafkaClient$$settings.storageHandler());
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromFile(String topicConfigName) {
        FileLock qual$2 = this.cometOffsetsLock(topicConfigName);
        long x$4 = qual$2.doExclusively$default$1();
        Serializable x$5 = new Serializable(this, topicConfigName){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ KafkaClient $outer;
            private final String topicConfigName$3;

            public final Option<List<Tuple2<Object, Object>>> apply() {
                None$ none$;
                Path cometOffsetsPath = new Path(this.$outer.cometOffsetsConfig().topicName(), this.topicConfigName$3);
                if (this.$outer.ai$starlake$utils$kafka$KafkaClient$$settings.storageHandler().exists(cometOffsetsPath)) {
                    BoxedUnit boxedUnit;
                    if (this.$outer.logger().underlying().isInfoEnabled()) {
                        this.$outer.logger().underlying().info("Loading comet offsets to path {}", new Object[]{cometOffsetsPath});
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        boxedUnit = BoxedUnit.UNIT;
                    }
                    List res = (List)((List)YamlSerializer$.MODULE$.mapper().readValue(this.$outer.ai$starlake$utils$kafka$KafkaClient$$settings.storageHandler().read(cometOffsetsPath), List.class)).map((Function1)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final Tuple2<Object, Object> apply(String str) {
                            String[] tab = new StringOps(Predef$.MODULE$.augmentString(str)).split(',');
                            return new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(tab[0])).toInt(), new StringOps(Predef$.MODULE$.augmentString(tab[1])).toLong());
                        }
                    }, List$.MODULE$.canBuildFrom());
                    none$ = new Some((Object)res);
                } else {
                    BoxedUnit boxedUnit;
                    if (this.$outer.logger().underlying().isInfoEnabled()) {
                        this.$outer.logger().underlying().info("Cannot load comet offsets: {} file does not exist", new Object[]{cometOffsetsPath});
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        boxedUnit = BoxedUnit.UNIT;
                    }
                    none$ = None$.MODULE$;
                }
                return none$;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.topicConfigName$3 = topicConfigName$3;
            }
        };
        return (Option)qual$2.doExclusively(x$4, x$5);
    }

    public Option<List<Tuple2<Object, Object>>> topicCurrentOffsets(String topicConfigName) {
        block4: {
            Option<List<Tuple2<Object, Object>>> option2;
            block3: {
                Mode mode;
                block2: {
                    mode = this.cometOffsetsMode();
                    if (!((Object)Mode$STREAM$.MODULE$).equals(mode)) break block2;
                    option2 = this.topicCurrentOffsetsFromStream(topicConfigName);
                    break block3;
                }
                if (!((Object)Mode$FILE$.MODULE$).equals(mode)) break block4;
                option2 = this.topicCurrentOffsetsFromFile(topicConfigName);
            }
            return option2;
        }
        throw new Exception("Should never happen");
    }

    public Option<String> offsetsAsJson(String topicName, List<Tuple2<Object, Object>> offsets) {
        None$ none$;
        if (offsets.isEmpty()) {
            none$ = None$.MODULE$;
        } else {
            String offsetsAsString = ((TraversableOnce)offsets.map((Function1)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final String apply(Tuple2<Object, Object> x0$7) {
                    Tuple2<Object, Object> tuple2 = x0$7;
                    if (tuple2 != null) {
                        int partition = tuple2._1$mcI$sp();
                        long partitionOffset = tuple2._2$mcJ$sp();
                        String string = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\"", "\": ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)partitionOffset)}));
                        return string;
                    }
                    throw new MatchError(tuple2);
                }
            }, List$.MODULE$.canBuildFrom())).mkString(",");
            none$ = new Some((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"{\"", "\":{", "}}"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicName, offsetsAsString})));
        }
        return none$;
    }

    public Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch(String topicConfigName, SparkSession session2, Settings.KafkaTopicConfig config) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        BoxedUnit boxedUnit3;
        BoxedUnit boxedUnit4;
        BoxedUnit boxedUnit5;
        long EARLIEST_OFFSET = -2L;
        List startOffsets = (List)this.topicCurrentOffsets(topicConfigName).getOrElse((Function0)new Serializable(this, config, EARLIEST_OFFSET){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ KafkaClient $outer;
            private final Settings.KafkaTopicConfig config$1;
            public final long EARLIEST_OFFSET$1;

            /*
             * WARNING - void declaration
             */
            public final List<Tuple2<Object, Object>> apply() {
                void var2_2;
                KafkaConsumer<String, String> consumer = this.$outer.ai$starlake$utils$kafka$KafkaClient$$newConsumer(this.config$1.topicName(), this.config$1.allAccessOptions(this.$outer.ai$starlake$utils$kafka$KafkaClient$$settings.comet().kafka().sparkServerOptions()));
                List partitions = (List)this.$outer.ai$starlake$utils$kafka$KafkaClient$$extractPartitions(this.config$1.topicName(), consumer).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$13 $outer;

                    public final Tuple2<Object, Object> apply(TopicPartition p) {
                        return new Tuple2.mcIJ.sp(p.partition(), this.$outer.EARLIEST_OFFSET$1);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                }, List$.MODULE$.canBuildFrom());
                consumer.close();
                return var2_2;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.config$1 = config$1;
                this.EARLIEST_OFFSET$1 = EARLIEST_OFFSET$1;
            }
        });
        if (this.logger().underlying().isInfoEnabled()) {
            startOffsets.foreach((Function1)new Serializable(this, topicConfigName){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ KafkaClient $outer;
                private final String topicConfigName$1;

                public final void apply(Tuple2<Object, Object> x0$8) {
                    Tuple2<Object, Object> tuple2 = x0$8;
                    if (tuple2 != null) {
                        BoxedUnit boxedUnit;
                        int partition = tuple2._1$mcI$sp();
                        long offsetStart = tuple2._2$mcJ$sp();
                        if (this.$outer.logger().underlying().isInfoEnabled()) {
                            this.$outer.logger().underlying().info("{} start-offset -> {}:{}", new Object[]{this.topicConfigName$1, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)offsetStart)});
                            boxedUnit = BoxedUnit.UNIT;
                        } else {
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        BoxedUnit boxedUnit2 = boxedUnit;
                        return;
                    }
                    throw new MatchError(tuple2);
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.topicConfigName$1 = topicConfigName$1;
                }
            });
            boxedUnit5 = BoxedUnit.UNIT;
        } else {
            boxedUnit5 = BoxedUnit.UNIT;
        }
        List<Tuple2<Object, Object>> endOffsets = this.topicEndOffsets(config.topicName(), config.allAccessOptions(this.ai$starlake$utils$kafka$KafkaClient$$settings.comet().kafka().sparkServerOptions()));
        if (this.logger().underlying().isInfoEnabled()) {
            endOffsets.foreach((Function1)new Serializable(this, topicConfigName){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ KafkaClient $outer;
                private final String topicConfigName$1;

                public final void apply(Tuple2<Object, Object> x0$9) {
                    Tuple2<Object, Object> tuple2 = x0$9;
                    if (tuple2 != null) {
                        BoxedUnit boxedUnit;
                        int partition = tuple2._1$mcI$sp();
                        long offsetEnd = tuple2._2$mcJ$sp();
                        if (this.$outer.logger().underlying().isInfoEnabled()) {
                            this.$outer.logger().underlying().info("{} end-offset -> {}:{}", new Object[]{this.topicConfigName$1, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)offsetEnd)});
                            boxedUnit = BoxedUnit.UNIT;
                        } else {
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        BoxedUnit boxedUnit2 = boxedUnit;
                        return;
                    }
                    throw new MatchError(tuple2);
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.topicConfigName$1 = topicConfigName$1;
                }
            });
            boxedUnit4 = BoxedUnit.UNIT;
        } else {
            boxedUnit4 = BoxedUnit.UNIT;
        }
        scala.collection.immutable.Map withOffsetsTopicOptions = config.allAccessOptions(this.ai$starlake$utils$kafka$KafkaClient$$settings.comet().kafka().sparkServerOptions()).$plus$plus((GenTraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"startingOffsets"), this.offsetsAsJson(config.topicName(), (List<Tuple2<Object, Object>>)startOffsets).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "earliest";
            }
        })), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"endingOffsets"), this.offsetsAsJson(config.topicName(), endOffsets).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "latest";
            }
        }))})));
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(new StringBuilder().append((Object)"withOffsetsTopicOptions:").append((Object)withOffsetsTopicOptions.toString()).toString());
            boxedUnit3 = BoxedUnit.UNIT;
        } else {
            boxedUnit3 = BoxedUnit.UNIT;
        }
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(new StringBuilder().append((Object)"settings.comet.kafka.sparkServerOptions:").append((Object)this.ai$starlake$utils$kafka$KafkaClient$$settings.comet().kafka().sparkServerOptions().toString()).toString());
            boxedUnit2 = BoxedUnit.UNIT;
        } else {
            boxedUnit2 = BoxedUnit.UNIT;
        }
        DataFrameReader reader = session2.read().format("kafka");
        Dataset df = reader.options((Map)withOffsetsTopicOptions).options(this.ai$starlake$utils$kafka$KafkaClient$$settings.comet().kafka().sparkServerOptions()).load().selectExpr(config.fields());
        if (this.logger().underlying().isInfoEnabled()) {
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(this.DatasetHelper(df).schemaString());
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return new Tuple2((Object)df, endOffsets);
    }

    public Dataset<Row> consumeTopicStreaming(SparkSession session2, Settings.KafkaTopicConfig config) {
        DataStreamReader reader = session2.readStream().format("kafka");
        Dataset df = reader.options(config.allAccessOptions(this.ai$starlake$utils$kafka$KafkaClient$$settings.comet().kafka().sparkServerOptions())).load().selectExpr(config.fields());
        return df;
    }

    public void sinkToTopic(Settings.KafkaTopicConfig config, Dataset<Row> df) {
        DataFrameWriter writer = df.selectExpr(config.fields()).write().format("kafka");
        writer.options(config.allAccessOptions(this.ai$starlake$utils$kafka$KafkaClient$$settings.comet().kafka().sparkServerOptions())).option("topic", config.topicName()).save();
    }

    public KafkaClient(Settings.KafkaConfig kafkaConfig, Settings settings) {
        block4: {
            block3: {
                BoxedUnit boxedUnit;
                Mode mode;
                block2: {
                    this.ai$starlake$utils$kafka$KafkaClient$$settings = settings;
                    StrictLogging.class.$init$((StrictLogging)this);
                    DatasetLogging$class.$init$(this);
                    this.cometOffsetsMode = (Mode)settings.comet().kafka().cometOffsetsMode().map((Function1)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final Mode apply(String value) {
                            return Mode$.MODULE$.fromString(value);
                        }
                    }).getOrElse((Function0)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final Mode$STREAM$ apply() {
                            return Mode$STREAM$.MODULE$;
                        }
                    });
                    this.serverOptions = kafkaConfig.serverOptions();
                    this.cometOffsetsConfig = (Settings.KafkaTopicConfig)kafkaConfig.topics().apply((Object)"comet_offsets");
                    this.serverOptionsProperties = new Properties();
                    this.serverOptions().foreach((Function1)new Serializable(this){
                        public static final long serialVersionUID = 0L;
                        private final /* synthetic */ KafkaClient $outer;

                        public final Object apply(Tuple2<String, String> x0$1) {
                            Tuple2<String, String> tuple2 = x0$1;
                            if (tuple2 != null) {
                                String k = (String)tuple2._1();
                                String v = (String)tuple2._2();
                                Object object = this.$outer.serverOptionsProperties().put(k, v);
                                return object;
                            }
                            throw new MatchError(tuple2);
                        }
                        {
                            if ($outer == null) {
                                throw null;
                            }
                            this.$outer = $outer;
                        }
                    });
                    mode = this.cometOffsetsMode();
                    if (!((Object)Mode$STREAM$.MODULE$).equals(mode)) break block2;
                    this.createTopicIfNotPresent(new NewTopic(this.cometOffsetsConfig().topicName(), this.cometOffsetsConfig().partitions(), this.cometOffsetsConfig().replicationFactor()), (scala.collection.immutable.Map<String, String>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cleanup.policy"), (Object)"compact")}))));
                    boxedUnit = BoxedUnit.UNIT;
                    break block3;
                }
                if (!((Object)Mode$FILE$.MODULE$).equals(mode)) break block4;
                boxedUnit = settings.storageHandler().exists(new Path(this.cometOffsetsConfig().topicName())) ? BoxedUnit.UNIT : BoxesRunTime.boxToBoolean((boolean)settings.storageHandler().mkdirs(new Path(this.cometOffsetsConfig().topicName())));
            }
            return;
        }
        throw new Exception("Should never happen");
    }
}

