package io.pipelite.channels.kafka;

import io.pipelite.channels.kafka.config.KafkaChannelConfiguration;
import io.pipelite.channels.kafka.support.KafkaConstants;
import io.pipelite.common.support.Preconditions;
import io.pipelite.spi.context.ContextEventListener;
import io.pipelite.spi.endpoint.DefaultProducer;
import io.pipelite.spi.endpoint.Endpoint;
import io.pipelite.spi.flow.exchange.Exchange;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:io/pipelite/channels/kafka/DefaultKafkaProducer.class */
public class DefaultKafkaProducer extends DefaultProducer implements ContextEventListener {
    private final KafkaChannelConfiguration configuration;
    private final KafkaProducer<Object, Object> kafkaProducer;
    private final String topicName;

    public DefaultKafkaProducer(Endpoint endpoint, KafkaChannelConfiguration kafkaChannelConfiguration) {
        super(endpoint);
        Preconditions.notNull(kafkaChannelConfiguration, "configuration is required and cannot be null");
        this.configuration = kafkaChannelConfiguration;
        this.topicName = endpoint.getEndpointURL().getResource();
        this.kafkaProducer = new KafkaProducer<>(createKafkaProperties(kafkaChannelConfiguration));
    }

    public void process(Exchange exchange) {
        Object inputPayloadAs = exchange.getInputPayloadAs(Object.class);
        if (inputPayloadAs != null) {
            this.kafkaProducer.send(new ProducerRecord(this.topicName, exchange.tryGetHeaderAs(KafkaConstants.KAFKA_RECORD_KEY_EXCHANGE_HEADER_NAME, Object.class).orElse(null), inputPayloadAs));
            this.kafkaProducer.flush();
        }
    }

    public void onContextStopped() {
        if (this.kafkaProducer != null) {
            this.kafkaProducer.close();
        }
    }

    private static Map<String, Object> createKafkaProperties(KafkaChannelConfiguration kafkaChannelConfiguration) {
        Map<String, Object> producerConfig = kafkaChannelConfiguration.getProducerConfig();
        producerConfig.putIfAbsent("bootstrap.servers", kafkaChannelConfiguration.getBootstrapServers());
        return producerConfig;
    }
}
