package io.mantisrx.connector.kafka.sink;

import com.netflix.spectator.api.Registry;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.PortRequest;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.sink.SelfDocumentingSink;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/mantisrx/connector/kafka/sink/KafkaSink.class */
public class KafkaSink<T> implements SelfDocumentingSink<T> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
    private final Func1<T, byte[]> encoder;
    private final Registry registry;
    private final AtomicReference<KafkaProducer<byte[], byte[]>> kafkaProducerAtomicRef = new AtomicReference<>(null);

    KafkaSink(Registry registry, Func1<T, byte[]> func1) {
        this.encoder = func1;
        this.registry = registry;
    }

    public void call(Context context, PortRequest portRequest, Observable<T> observable) {
        if (this.kafkaProducerAtomicRef.get() == null) {
            this.kafkaProducerAtomicRef.compareAndSet(null, new KafkaProducer<>(new MantisKafkaProducerConfig(context).getProducerProperties()));
            logger.info("Kafka Producer initialized");
        }
        KafkaProducer<byte[], byte[]> kafkaProducer = this.kafkaProducerAtomicRef.get();
        String str = (String) context.getParameters().get(KafkaSinkJobParameters.TOPIC);
        Func1<T, byte[]> func1 = this.encoder;
        func1.getClass();
        observable.map(func1::call).flatMap(bArr -> {
            return Observable.from(kafkaProducer.send(new ProducerRecord(str, bArr))).subscribeOn(Schedulers.io());
        }).subscribe();
    }

    public List<ParameterDefinition<?>> getParameters() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new StringParameter().name(KafkaSinkJobParameters.TOPIC).description("Kafka topic to write to").validator(Validators.notNullOrEmpty()).required().build());
        arrayList.addAll(MantisKafkaProducerConfig.getJobParameterDefinitions());
        return arrayList;
    }

    public Metadata metadata() {
        return new Metadata.Builder().name("Mantis Kafka Sink").description("Writes the output of the job into the configured Kafka topic").build();
    }
}
