package org.citrusframework.kafka.endpoint;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Optional;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.citrusframework.context.TestContext;
import org.citrusframework.exceptions.CitrusRuntimeException;
import org.citrusframework.exceptions.MessageTimeoutException;
import org.citrusframework.message.Message;
import org.citrusframework.messaging.AbstractMessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/citrusframework/kafka/endpoint/KafkaConsumer.class */
public class KafkaConsumer extends AbstractMessageConsumer {
    private static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    protected final KafkaEndpointConfiguration endpointConfiguration;
    private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer;

    public KafkaConsumer(String str, KafkaEndpointConfiguration kafkaEndpointConfiguration) {
        super(str, kafkaEndpointConfiguration);
        this.endpointConfiguration = kafkaEndpointConfiguration;
        this.consumer = createConsumer();
    }

    public Message receive(TestContext testContext, long j) {
        String replaceDynamicContentInString = testContext.replaceDynamicContentInString((String) Optional.ofNullable(this.endpointConfiguration.getTopic()).orElseThrow(() -> {
            return new CitrusRuntimeException("Missing Kafka topic to receive messages from - add topic to endpoint configuration");
        }));
        if (log.isDebugEnabled()) {
            log.debug("Receiving Kafka message on topic: '" + replaceDynamicContentInString);
        }
        if (CollectionUtils.isEmpty(this.consumer.subscription())) {
            this.consumer.subscribe(Arrays.asList(StringUtils.commaDelimitedListToStringArray(replaceDynamicContentInString)));
        }
        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(j));
        if (poll.isEmpty()) {
            throw new MessageTimeoutException(j, replaceDynamicContentInString);
        }
        if (log.isDebugEnabled()) {
            poll.forEach(consumerRecord -> {
                log.debug("Received message: (" + consumerRecord.key() + ", " + consumerRecord.value() + ") at offset " + consumerRecord.offset());
            });
        }
        Message convertInbound = this.endpointConfiguration.getMessageConverter().convertInbound((ConsumerRecord<Object, Object>) poll.iterator().next(), this.endpointConfiguration, testContext);
        testContext.onInboundMessage(convertInbound);
        this.consumer.commitSync(Duration.ofMillis(this.endpointConfiguration.getTimeout()));
        log.info("Received Kafka message on topic: '" + replaceDynamicContentInString);
        return convertInbound;
    }

    public void stop() {
        try {
            if (!CollectionUtils.isEmpty(this.consumer.subscription())) {
                this.consumer.unsubscribe();
            }
        } finally {
            this.consumer.close(Duration.ofMillis(10000L));
        }
    }

    private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> createConsumer() {
        HashMap hashMap = new HashMap();
        hashMap.put("client.id", Optional.ofNullable(this.endpointConfiguration.getClientId()).orElse("citrus_kafka_consumer_" + UUID.randomUUID().toString()));
        hashMap.put("group.id", this.endpointConfiguration.getConsumerGroup());
        hashMap.put("bootstrap.servers", Optional.ofNullable(this.endpointConfiguration.getServer()).orElse("localhost:9092"));
        hashMap.put("max.poll.records", 1);
        hashMap.put("enable.auto.commit", Boolean.valueOf(this.endpointConfiguration.isAutoCommit()));
        hashMap.put("auto.commit.interval.ms", Integer.valueOf(this.endpointConfiguration.getAutoCommitInterval()));
        hashMap.put("auto.offset.reset", this.endpointConfiguration.getOffsetReset());
        hashMap.put("key.deserializer", this.endpointConfiguration.getKeyDeserializer());
        hashMap.put("value.deserializer", this.endpointConfiguration.getValueDeserializer());
        hashMap.putAll(this.endpointConfiguration.getConsumerProperties());
        return new org.apache.kafka.clients.consumer.KafkaConsumer<>(hashMap);
    }

    public void setConsumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }
}
