package io.openlineage.proxy.api.models;

import io.openlineage.proxy.api.models.LineageStream;
import lombok.NonNull;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/proxy/api/models/KafkaLineageStream.class */
public class KafkaLineageStream extends LineageStream {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaLineageStream.class);
    private final String topicName;
    private final String messageKey;
    private final KafkaProducer<String, String> producer;

    public KafkaLineageStream(@NonNull KafkaConfig kafkaConfig) {
        super(LineageStream.Type.KAFKA);
        if (kafkaConfig == null) {
            throw new NullPointerException("kafkaConfig is marked non-null but is null");
        }
        this.topicName = kafkaConfig.getTopicName();
        this.messageKey = kafkaConfig.getMessageKey();
        this.producer = new KafkaProducer<>(kafkaConfig.getProperties());
    }

    @Override // io.openlineage.proxy.api.models.LineageStream
    public void collect(@NonNull String str) {
        if (str == null) {
            throw new NullPointerException("eventAsString is marked non-null but is null");
        }
        log.debug("Received lineage event: {}", str);
        try {
            this.producer.send(new ProducerRecord<>(this.topicName, this.messageKey, str));
        } catch (Exception e) {
            log.error("Failed to collect lineage event: {}", str, e);
        }
    }
}
