package org.trellisldp.kafka;

import java.util.Objects;
import java.util.Properties;
import javax.inject.Inject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.trellisldp.api.Event;
import org.trellisldp.api.EventSerializationService;
import org.trellisldp.api.EventService;
import org.trellisldp.api.NoopEventSerializationService;

/* loaded from: input_file:org/trellisldp/kafka/KafkaEventService.class */
public class KafkaEventService implements EventService {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventService.class);
    public static final String CONFIG_KAFKA_TOPIC = "trellis.kafka.topic";
    private final EventSerializationService serializer;
    private final Producer<String, String> producer;
    private final String topic;

    public KafkaEventService() {
        this(new NoopEventSerializationService());
    }

    @Inject
    public KafkaEventService(EventSerializationService eventSerializationService) {
        this(eventSerializationService, ConfigProvider.getConfig());
    }

    private KafkaEventService(EventSerializationService eventSerializationService, Config config) {
        this(eventSerializationService, buildProducer(config), (String) config.getValue(CONFIG_KAFKA_TOPIC, String.class));
    }

    public KafkaEventService(EventSerializationService eventSerializationService, Producer<String, String> producer) {
        this(eventSerializationService, producer, (String) ConfigProvider.getConfig().getValue(CONFIG_KAFKA_TOPIC, String.class));
    }

    public KafkaEventService(EventSerializationService eventSerializationService, Producer<String, String> producer, String str) {
        this.serializer = (EventSerializationService) Objects.requireNonNull(eventSerializationService, "The Event serializer may not be null!");
        this.producer = (Producer) Objects.requireNonNull(producer, "Kafka producer may not be null!");
        this.topic = (String) Objects.requireNonNull(str, "Kafka topic name may not be null!");
    }

    public void emit(Event event) {
        Objects.requireNonNull(event, "Cannot emit a null event!");
        LOGGER.debug("Sending message to Kafka topic: {}", this.topic);
        this.producer.send(new ProducerRecord(this.topic, event.getObject().map((v0) -> {
            return v0.getIRIString();
        }).orElse(null), this.serializer.serialize(event)));
    }

    private static Producer<String, String> buildProducer(Config config) {
        Properties properties = new Properties();
        properties.setProperty("acks", (String) config.getOptionalValue("trellis.kafka.acks", String.class).orElse("all"));
        properties.setProperty("batch.size", (String) config.getOptionalValue("trellis.kafka.batch.size", String.class).orElse("16384"));
        properties.setProperty("retries", (String) config.getOptionalValue("trellis.kafka.retries", String.class).orElse("0"));
        properties.setProperty("linger.ms", (String) config.getOptionalValue("trellis.kafka.linger.ms", String.class).orElse("1"));
        properties.setProperty("buffer.memory", (String) config.getOptionalValue("trellis.kafka.buffer.memory", String.class).orElse("33554432"));
        config.getPropertyNames().forEach(str -> {
            if (!str.startsWith("trellis.kafka.") || CONFIG_KAFKA_TOPIC.equals(str)) {
                return;
            }
            properties.setProperty(str.substring("trellis.kafka.".length()), (String) config.getValue(str, String.class));
        });
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("bootstrap.servers", (String) config.getValue("trellis.kafka.bootstrap.servers", String.class));
        return new KafkaProducer(properties);
    }
}
