package io.amient.affinity.core.storage.kafka;

import com.typesafe.config.Config;
import io.amient.affinity.core.storage.Storage;
import java.lang.ref.SoftReference;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import scala.Predef$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.EmptyMethodCache;
import scala.runtime.MethodCache;
import scala.runtime.ScalaRunTime$;

/* compiled from: KafkaStorage.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005uw!B\u0001\u0003\u0011\u0003y\u0011\u0001D&bM.\f7\u000b^8sC\u001e,'BA\u0002\u0005\u0003\u0015Y\u0017MZ6b\u0015\t)a!A\u0004ti>\u0014\u0018mZ3\u000b\u0005\u001dA\u0011\u0001B2pe\u0016T!!\u0003\u0006\u0002\u0011\u00054g-\u001b8jifT!a\u0003\u0007\u0002\r\u0005l\u0017.\u001a8u\u0015\u0005i\u0011AA5p\u0007\u0001\u0001\"\u0001E\t\u000e\u0003\t1QA\u0005\u0002\t\u0002M\u0011AbS1gW\u0006\u001cFo\u001c:bO\u0016\u001c\"!\u0005\u000b\u0011\u0005UAR\"\u0001\f\u000b\u0003]\tQa]2bY\u0006L!!\u0007\f\u0003\r\u0005s\u0017PU3g\u0011\u0015Y\u0012\u0003\"\u0001\u001d\u0003\u0019a\u0014N\\5u}Q\tq\u0002C\u0003\u001f#\u0011\u0005q$\u0001\u0010D\u001f:3\u0015jR0L\u0003\u001a[\u0015i\u0018\"P\u001fR\u001bFKU!Q?N+%KV#S'V\t\u0001\u0005\u0005\u0002\"M5\t!E\u0003\u0002$I\u0005!A.\u00198h\u0015\u0005)\u0013\u0001\u00026bm\u0006L!a\n\u0012\u0003\rM#(/\u001b8h\u0011\u0015I\u0013\u0003\"\u0001+\u0003I\u0019uJ\u0014$J\u000f~[\u0015IR&B?R{\u0005+S\"\u0016\u0003-\u0002\"\u0001L\u0018\u000f\u0005Ui\u0013B\u0001\u0018\u0017\u0003\u0019\u0001&/\u001a3fM&\u0011q\u0005\r\u0006\u0003]YAQAM\t\u0005\u0002)\nQcQ(O\r&;ulS!G\u0017\u0006{\u0006KU(E+\u000e+%\u000bC\u00035#\u0011\u0005!&A\u000bD\u001f:3\u0015jR0L\u0003\u001a[\u0015iX\"P\u001dN+V*\u0012*\u0007\tI\u0011\u0001AN\n\u0003k]\u0002\"\u0001O\u001d\u000e\u0003\u0011I!A\u000f\u0003\u0003\u000fM#xN]1hK\"AA(\u000eB\u0001B\u0003%Q(\u0001\u0004d_:4\u0017n\u001a\t\u0003}\u0011k\u0011a\u0010\u0006\u0003y\u0001S!!\u0011\"\u0002\u0011QL\b/Z:bM\u0016T\u0011aQ\u0001\u0004G>l\u0017BA#@\u0005\u0019\u0019uN\u001c4jO\"Aq)\u000eB\u0001B\u0003%\u0001*A\u0005qCJ$\u0018\u000e^5p]B\u0011Q#S\u0005\u0003\u0015Z\u00111!\u00138u\u0011\u0015YR\u0007\"\u0001M)\riej\u0014\t\u0003!UBQ\u0001P&A\u0002uBQaR&A\u0002!Cq!U\u001bC\u0002\u0013\u0015!&A\u0004ce>\\WM]:\t\rM+\u0004\u0015!\u0004,\u0003!\u0011'o\\6feN\u0004\u0003bB+6\u0005\u0004%)AK\u0001\u0006i>\u0004\u0018n\u0019\u0005\u0007/V\u0002\u000bQB\u0016\u0002\rQ|\u0007/[2!\u0011\u001dIVG1A\u0005\ni\u000bQ\u0002\u001d:pIV\u001cWM\u001d)s_B\u001cX#A.\u0011\u0005q{V\"A/\u000b\u0005y#\u0013\u0001B;uS2L!\u0001Y/\u0003\u0015A\u0013x\u000e]3si&,7\u000f\u0003\u0004ck\u0001\u0006IaW\u0001\u000faJ|G-^2feB\u0013x\u000e]:!\u0011\u001d!WG1A\u0005\u0002i\u000bQbY8ogVlWM\u001d)s_B\u001c\bB\u000246A\u0003%1,\u0001\bd_:\u001cX/\\3s!J|\u0007o\u001d\u0011\t\u000f!,$\u0019!C\tS\u0006i1.\u00194lCB\u0013x\u000eZ;dKJ,\u0012A\u001b\t\u0005WV<x/D\u0001m\u0015\tig.\u0001\u0005qe>$WoY3s\u0015\ty\u0007/A\u0004dY&,g\u000e^:\u000b\u0005\r\t(B\u0001:t\u0003\u0019\t\u0007/Y2iK*\tA/A\u0002pe\u001eL!A\u001e7\u0003\u001b-\u000bgm[1Qe>$WoY3s!\tA80D\u0001z\u0015\tQH%A\u0002oS>L!\u0001`=\u0003\u0015\tKH/\u001a\"vM\u001a,'\u000f\u0003\u0004\u007fk\u0001\u0006IA[\u0001\u000fW\u000647.\u0019)s_\u0012,8-\u001a:!\u0011%\t\t!\u000ea\u0001\n\u0013\t\u0019!A\u0004uC&d\u0017N\\4\u0016\u0005\u0005\u0015\u0001cA\u000b\u0002\b%\u0019\u0011\u0011\u0002\f\u0003\u000f\t{w\u000e\\3b]\"I\u0011QB\u001bA\u0002\u0013%\u0011qB\u0001\fi\u0006LG.\u001b8h?\u0012*\u0017\u000f\u0006\u0003\u0002\u0012\u0005]\u0001cA\u000b\u0002\u0014%\u0019\u0011Q\u0003\f\u0003\tUs\u0017\u000e\u001e\u0005\u000b\u00033\tY!!AA\u0002\u0005\u0015\u0011a\u0001=%c!A\u0011QD\u001b!B\u0013\t)!\u0001\u0005uC&d\u0017N\\4!Q\u0011\tY\"!\t\u0011\u0007U\t\u0019#C\u0002\u0002&Y\u0011\u0001B^8mCRLG.\u001a\u0005\n\u0003S)\u0004\u0019!C\u0005\u0003\u0007\t\u0011bY8ogVl\u0017N\\4\t\u0013\u00055R\u00071A\u0005\n\u0005=\u0012!D2p]N,X.\u001b8h?\u0012*\u0017\u000f\u0006\u0003\u0002\u0012\u0005E\u0002BCA\r\u0003W\t\t\u00111\u0001\u0002\u0006!A\u0011QG\u001b!B\u0013\t)!\u0001\u0006d_:\u001cX/\\5oO\u0002BC!a\r\u0002\"!I\u00111H\u001bC\u0002\u0013%\u0011QH\u0001\u000eG>t7/^7fe\u0016\u0013(o\u001c:\u0016\u0005\u0005}\u0002CBA!\u0003\u0017\ny%\u0004\u0002\u0002D)!\u0011QIA$\u0003\u0019\tGo\\7jG*\u0019\u0011\u0011J/\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0003\u0002N\u0005\r#aD!u_6L7MU3gKJ,gnY3\u0011\t\u0005E\u0013\u0011\r\b\u0005\u0003'\niF\u0004\u0003\u0002V\u0005mSBAA,\u0015\r\tIFD\u0001\u0007yI|w\u000e\u001e \n\u0003]I1!a\u0018\u0017\u0003\u001d\u0001\u0018mY6bO\u0016LA!a\u0019\u0002f\tIA\u000b\u001b:po\u0006\u0014G.\u001a\u0006\u0004\u0003?2\u0002\u0002CA5k\u0001\u0006I!a\u0010\u0002\u001d\r|gn];nKJ,%O]8sA!I\u0011QN\u001bC\u0002\u0013%\u0011qN\u0001\tG>t7/^7feV\u0011\u0011\u0011\u000f\n\u0005\u0003g\nYHB\u0004\u0002v\u0005]\u0004!!\u001d\u0003\u0019q\u0012XMZ5oK6,g\u000e\u001e \t\u0011\u0005eT\u0007)A\u0005\u0003c\n\u0011bY8ogVlWM\u001d\u0011\u0011\u0007\u0005\ni(C\u0002\u0002��\t\u0012a\u0001\u00165sK\u0006$\u0007BCAB\u0003g\u0012\r\u0011\"\u0001\u0002\u0006\u0006i1.\u00194lC\u000e{gn];nKJ,\"!a\"\u0011\r\u0005%\u0015QR<x\u001b\t\tYIC\u0002\u0002n9LA!a$\u0002\f\ni1*\u00194lC\u000e{gn];nKJD!\"a%\u0002t\t\u0007I\u0011AAK\u0003\t!\b/\u0006\u0002\u0002\u0018B!\u0011\u0011TAP\u001b\t\tYJC\u0002\u0002\u001eB\faaY8n[>t\u0017\u0002BAQ\u00037\u0013a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g\u000e\u0003\u0006\u0002&\u0006M$\u0019!C\u0001\u0003O\u000b!cY8ogVlWM\u001d)beRLG/[8ogV\u0011\u0011\u0011\u0016\t\u00069\u0006-\u0016qS\u0005\u0004\u0003[k&\u0001\u0002'jgRD\u0001\"!-6\t\u0003A\u00111W\u0001\u0005S:LG\u000f\u0006\u0002\u0002\u0012!A\u0011qW\u001b\u0005\u0002!\t\u0019,\u0001\u0003c_>$\b\u0002CA^k\u0011\u0005\u0001\"a-\u0002\tQ\f\u0017\u000e\u001c\u0005\b\u0003\u007f+D\u0011KAZ\u0003\u0011\u0019Ho\u001c9\t\u000f\u0005\rW\u0007\"\u0001\u0002F\u0006)qO]5uKR1\u0011qYAk\u00033\u0004b!!3\u0002L\u0006=WBAA$\u0013\u0011\ti-a\u0012\u0003\r\u0019+H/\u001e:f!\rY\u0017\u0011[\u0005\u0004\u0003'd'A\u0004*fG>\u0014H-T3uC\u0012\fG/\u0019\u0005\b\u0003/\f\t\r1\u0001x\u0003\rYW-\u001f\u0005\b\u00037\f\t\r1\u0001x\u0003\u00151\u0018\r\\;f\u0001")
/* loaded from: input_file:io/amient/affinity/core/storage/kafka/KafkaStorage.class */
public class KafkaStorage extends Storage {
    public final Config io$amient$affinity$core$storage$kafka$KafkaStorage$$config;
    public final int io$amient$affinity$core$storage$kafka$KafkaStorage$$partition;
    private final String brokers;
    private final String topic;
    private final Properties producerProps;
    private final Properties consumerProps;
    private final KafkaProducer<ByteBuffer, ByteBuffer> kafkaProducer;
    private volatile boolean io$amient$affinity$core$storage$kafka$KafkaStorage$$tailing;
    private volatile boolean io$amient$affinity$core$storage$kafka$KafkaStorage$$consuming;
    private final AtomicReference<Throwable> io$amient$affinity$core$storage$kafka$KafkaStorage$$consumerError;
    private final Thread consumer;
    private static Class[] reflParams$Cache1 = new Class[0];
    private static volatile SoftReference reflPoly$Cache1 = new SoftReference(new EmptyMethodCache());

    public static String CONFIG_KAFKA_CONSUMER() {
        return KafkaStorage$.MODULE$.CONFIG_KAFKA_CONSUMER();
    }

    public static String CONFIG_KAFKA_PRODUCER() {
        return KafkaStorage$.MODULE$.CONFIG_KAFKA_PRODUCER();
    }

    public static String CONFIG_KAFKA_TOPIC() {
        return KafkaStorage$.MODULE$.CONFIG_KAFKA_TOPIC();
    }

    public static String CONFIG_KAFKA_BOOTSTRAP_SERVERS() {
        return KafkaStorage$.MODULE$.CONFIG_KAFKA_BOOTSTRAP_SERVERS();
    }

    public static Method reflMethod$Method1(Class cls) {
        EmptyMethodCache emptyMethodCache = (MethodCache) reflPoly$Cache1.get();
        if (emptyMethodCache == null) {
            emptyMethodCache = new EmptyMethodCache();
            reflPoly$Cache1 = new SoftReference(emptyMethodCache);
        }
        Method find = emptyMethodCache.find(cls);
        if (find != null) {
            return find;
        }
        Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("kafkaConsumer", reflParams$Cache1));
        reflPoly$Cache1 = new SoftReference(emptyMethodCache.add(cls, ensureAccessible));
        return ensureAccessible;
    }

    public final String brokers() {
        return this.brokers;
    }

    public final String topic() {
        return this.topic;
    }

    private Properties producerProps() {
        return this.producerProps;
    }

    public Properties consumerProps() {
        return this.consumerProps;
    }

    public KafkaProducer<ByteBuffer, ByteBuffer> kafkaProducer() {
        return this.kafkaProducer;
    }

    public boolean io$amient$affinity$core$storage$kafka$KafkaStorage$$tailing() {
        return this.io$amient$affinity$core$storage$kafka$KafkaStorage$$tailing;
    }

    private void io$amient$affinity$core$storage$kafka$KafkaStorage$$tailing_$eq(boolean z) {
        this.io$amient$affinity$core$storage$kafka$KafkaStorage$$tailing = z;
    }

    public boolean io$amient$affinity$core$storage$kafka$KafkaStorage$$consuming() {
        return this.io$amient$affinity$core$storage$kafka$KafkaStorage$$consuming;
    }

    public void io$amient$affinity$core$storage$kafka$KafkaStorage$$consuming_$eq(boolean z) {
        this.io$amient$affinity$core$storage$kafka$KafkaStorage$$consuming = z;
    }

    public AtomicReference<Throwable> io$amient$affinity$core$storage$kafka$KafkaStorage$$consumerError() {
        return this.io$amient$affinity$core$storage$kafka$KafkaStorage$$consumerError;
    }

    private Thread consumer() {
        return this.consumer;
    }

    public void init() {
        consumer().start();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Thread] */
    /* JADX WARN: Type inference failed for: r0v14, types: [java.lang.Thread, java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v19, types: [java.lang.reflect.Method] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    public void boot() {
        ?? consumer = consumer();
        synchronized (consumer) {
            if (!io$amient$affinity$core$storage$kafka$KafkaStorage$$tailing()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
            io$amient$affinity$core$storage$kafka$KafkaStorage$$tailing_$eq(false);
            do {
                consumer().wait(6000L);
                if (io$amient$affinity$core$storage$kafka$KafkaStorage$$consumerError().get() != null) {
                    consumer = consumer();
                    try {
                        ((KafkaConsumer) reflMethod$Method1(consumer.getClass()).invoke(consumer, new Object[0])).wakeup();
                        throw io$amient$affinity$core$storage$kafka$KafkaStorage$$consumerError().get();
                    } catch (InvocationTargetException e) {
                        throw e.getCause();
                    }
                }
            } while (io$amient$affinity$core$storage$kafka$KafkaStorage$$consuming());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Thread] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v9 */
    public void tail() {
        ?? consumer = consumer();
        synchronized (consumer) {
            if (io$amient$affinity$core$storage$kafka$KafkaStorage$$tailing()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                io$amient$affinity$core$storage$kafka$KafkaStorage$$tailing_$eq(true);
                consumer().notify();
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            consumer = consumer;
        }
    }

    public void stop() {
        try {
            consumer().interrupt();
        } finally {
            kafkaProducer().close();
        }
    }

    public Future<RecordMetadata> write(ByteBuffer byteBuffer, ByteBuffer byteBuffer2) {
        return kafkaProducer().send(new ProducerRecord(topic(), Predef$.MODULE$.int2Integer(this.io$amient$affinity$core$storage$kafka$KafkaStorage$$partition), byteBuffer, byteBuffer2));
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KafkaStorage(Config config, int i) {
        super(config, i);
        this.io$amient$affinity$core$storage$kafka$KafkaStorage$$config = config;
        this.io$amient$affinity$core$storage$kafka$KafkaStorage$$partition = i;
        this.brokers = config.getString(KafkaStorage$.MODULE$.CONFIG_KAFKA_BOOTSTRAP_SERVERS());
        this.topic = config.getString(KafkaStorage$.MODULE$.CONFIG_KAFKA_TOPIC());
        this.producerProps = new Properties(this) { // from class: io.amient.affinity.core.storage.kafka.KafkaStorage$$anon$1
            {
                if (this.io$amient$affinity$core$storage$kafka$KafkaStorage$$config.hasPath(KafkaStorage$.MODULE$.CONFIG_KAFKA_PRODUCER())) {
                    Config config2 = this.io$amient$affinity$core$storage$kafka$KafkaStorage$$config.getConfig(KafkaStorage$.MODULE$.CONFIG_KAFKA_PRODUCER());
                    if (config2.hasPath("bootstrap.servers")) {
                        throw new IllegalArgumentException("bootstrap.servers cannot be overriden for KafkaStroage producer");
                    }
                    if (config2.hasPath("key.serializer")) {
                        throw new IllegalArgumentException("key.serializer cannot be overriden for KafkaStroage producer");
                    }
                    if (config2.hasPath("value.serializer")) {
                        throw new IllegalArgumentException("value.serializer cannot be overriden for KafkaStroage producer");
                    }
                    ((IterableLike) JavaConverters$.MODULE$.asScalaSetConverter(config2.entrySet()).asScala()).foreach(new KafkaStorage$$anon$1$$anonfun$1(this));
                }
                put("bootstrap.servers", this.brokers());
                put("key.serializer", ByteBufferSerializer.class.getName());
                put("value.serializer", ByteBufferSerializer.class.getName());
            }
        };
        this.consumerProps = new Properties(this) { // from class: io.amient.affinity.core.storage.kafka.KafkaStorage$$anon$2
            {
                if (this.io$amient$affinity$core$storage$kafka$KafkaStorage$$config.hasPath(KafkaStorage$.MODULE$.CONFIG_KAFKA_CONSUMER())) {
                    Config config2 = this.io$amient$affinity$core$storage$kafka$KafkaStorage$$config.getConfig(KafkaStorage$.MODULE$.CONFIG_KAFKA_CONSUMER());
                    if (config2.hasPath("bootstrap.servers")) {
                        throw new IllegalArgumentException("bootstrap.servers cannot be overriden for KafkaStroage consumer");
                    }
                    if (config2.hasPath("enable.auto.commit")) {
                        throw new IllegalArgumentException("enable.auto.commit cannot be overriden for KafkaStroage consumer");
                    }
                    if (config2.hasPath("key.deserializer")) {
                        throw new IllegalArgumentException("key.deserializer cannot be overriden for KafkaStroage consumer");
                    }
                    if (config2.hasPath("value.deserializer")) {
                        throw new IllegalArgumentException("value.deserializer cannot be overriden for KafkaStroage consumer");
                    }
                    ((IterableLike) JavaConverters$.MODULE$.asScalaSetConverter(config2.entrySet()).asScala()).foreach(new KafkaStorage$$anon$2$$anonfun$2(this));
                }
                put("bootstrap.servers", this.brokers());
                put("enable.auto.commit", "false");
                put("key.deserializer", ByteBufferDeserializer.class.getName());
                put("value.deserializer", ByteBufferDeserializer.class.getName());
            }
        };
        this.kafkaProducer = new KafkaProducer<>(producerProps());
        this.io$amient$affinity$core$storage$kafka$KafkaStorage$$tailing = true;
        this.io$amient$affinity$core$storage$kafka$KafkaStorage$$consuming = false;
        this.io$amient$affinity$core$storage$kafka$KafkaStorage$$consumerError = new AtomicReference<>(null);
        this.consumer = new KafkaStorage$$anon$3(this);
    }
}
