package org.graylog2.inputs.transports;

import com.google.common.eventbus.EventBus;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.graylog.testing.kafka.KafkaContainer;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.plugin.system.SimpleNodeId;
import org.graylog2.shared.SuppressForbidden;
import org.graylog2.shared.utilities.StringUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/graylog2/inputs/transports/KafkaTransportIT.class */
class KafkaTransportIT {

    @Container
    private static final KafkaContainer KAFKA = KafkaContainer.create();

    @Captor
    ArgumentCaptor<RawMessage> messageCaptor;

    KafkaTransportIT() {
    }

    @SuppressForbidden("Executors.newSingleThreadScheduledExecutor is okay in tests")
    @Test
    void basicConsumer() throws Exception {
        KAFKA.createTopic("test");
        byte[] bytes = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
        ProducerRecord producerRecord = new ProducerRecord("test", bytes);
        KafkaProducer<String, byte[]> createByteArrayProducer = KAFKA.createByteArrayProducer();
        try {
            createByteArrayProducer.send(producerRecord).get(30L, TimeUnit.SECONDS);
            if (createByteArrayProducer != null) {
                createByteArrayProducer.close();
            }
            KafkaTransport kafkaTransport = new KafkaTransport(new Configuration(Map.of("legacy_mode", false, "threads", 1, "bootstrap_server", StringUtils.f("localhost:%d", new Object[]{Integer.valueOf(KAFKA.getKafkaPort())}), "fetch_min_bytes", 1, "fetch_wait_max", 100, "topic_filter", "test", "offset_reset", "smallest")), new LocalMetricRegistry(), new SimpleNodeId("node-1"), new EventBus(), (ServerStatus) Mockito.mock(ServerStatus.class), Executors.newSingleThreadScheduledExecutor());
            MessageInput messageInput = (MessageInput) Mockito.mock(MessageInput.class);
            Mockito.when(messageInput.getId()).thenReturn("TEST");
            kafkaTransport.lifecycleStateChange(Lifecycle.RUNNING);
            kafkaTransport.launch(messageInput);
            ((MessageInput) Mockito.verify(messageInput, Mockito.timeout(5000L).times(1))).processRawMessage((RawMessage) this.messageCaptor.capture());
            Assertions.assertThat((RawMessage) this.messageCaptor.getValue()).isNotNull().satisfies(new ThrowingConsumer[]{rawMessage -> {
                Assertions.assertThat(rawMessage.getId()).isNotNull();
                Assertions.assertThat(rawMessage.getPayload()).isEqualTo(bytes);
            }});
        } catch (Throwable th) {
            if (createByteArrayProducer != null) {
                try {
                    createByteArrayProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
