package org.graylog2.inputs.transports;

import com.codahale.metrics.Gauge;
import com.github.joschi.jadconfig.util.Size;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SystemUtils;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.graylog2.inputs.transports.UdpTransport;
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
import org.graylog2.shared.SuppressForbidden;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/graylog2/inputs/transports/UdpTransportTest.class */
public class UdpTransportTest {
    private static final String BIND_ADDRESS = "127.0.0.1";
    private static final int PORT = 0;
    private static final int RECV_BUFFER_SIZE = 1024;
    private static final ImmutableMap<String, Object> CONFIG_SOURCE = ImmutableMap.of("bind_address", BIND_ADDRESS, "port", Integer.valueOf(PORT), "recv_buffer_size", Integer.valueOf(RECV_BUFFER_SIZE), "number_worker_threads", 1);
    private static final Configuration CONFIGURATION = new Configuration(CONFIG_SOURCE);
    private final NettyTransportConfiguration nettyTransportConfiguration = new NettyTransportConfiguration("nio", "jdk", 1);
    private UdpTransport udpTransport;
    private EventLoopGroup eventLoopGroup;
    private EventLoopGroupFactory eventLoopGroupFactory;
    private ThroughputCounter throughputCounter;
    private LocalMetricRegistry localMetricRegistry;

    /* loaded from: input_file:org/graylog2/inputs/transports/UdpTransportTest$CountingChannelUpstreamHandler.class */
    public static class CountingChannelUpstreamHandler extends SimpleChannelInboundHandler<DatagramPacket> {
        private final List<Integer> bytesWritten = new CopyOnWriteArrayList();

        /* JADX INFO: Access modifiers changed from: protected */
        public void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {
            this.bytesWritten.add(Integer.valueOf(((ByteBuf) datagramPacket.content()).readableBytes()));
            channelHandlerContext.fireChannelRead(ReferenceCountUtil.retain(datagramPacket));
        }

        List<Integer> getBytesWritten() {
            return this.bytesWritten;
        }
    }

    @Before
    @SuppressForbidden("Executors#newSingleThreadExecutor() is okay for tests")
    public void setUp() throws Exception {
        this.eventLoopGroupFactory = new EventLoopGroupFactory(this.nettyTransportConfiguration);
        this.localMetricRegistry = new LocalMetricRegistry();
        this.eventLoopGroup = this.eventLoopGroupFactory.create(1, this.localMetricRegistry, "test");
        this.throughputCounter = new ThroughputCounter(this.eventLoopGroup);
        this.udpTransport = new UdpTransport(CONFIGURATION, this.eventLoopGroupFactory, this.nettyTransportConfiguration, this.throughputCounter, this.localMetricRegistry);
    }

    @After
    public void tearDown() {
        this.eventLoopGroup.shutdownGracefully();
    }

    @Test
    public void transportReceivesDataSmallerThanRecvBufferSize() throws Exception {
        CountingChannelUpstreamHandler countingChannelUpstreamHandler = new CountingChannelUpstreamHandler();
        UdpTransport launchTransportForBootStrapTest = launchTransportForBootStrapTest(countingChannelUpstreamHandler);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(launchTransportForBootStrapTest.getLocalAddress() != null);
        });
        sendUdpDatagram(BIND_ADDRESS, ((InetSocketAddress) launchTransportForBootStrapTest.getLocalAddress()).getPort(), 100);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!countingChannelUpstreamHandler.getBytesWritten().isEmpty());
        });
        launchTransportForBootStrapTest.stop();
        Assertions.assertThat(countingChannelUpstreamHandler.getBytesWritten()).containsOnly(new Integer[]{100});
    }

    @Test
    public void transportReceivesDataExactlyRecvBufferSize() throws Exception {
        CountingChannelUpstreamHandler countingChannelUpstreamHandler = new CountingChannelUpstreamHandler();
        UdpTransport launchTransportForBootStrapTest = launchTransportForBootStrapTest(countingChannelUpstreamHandler);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(launchTransportForBootStrapTest.getLocalAddress() != null);
        });
        sendUdpDatagram(BIND_ADDRESS, ((InetSocketAddress) launchTransportForBootStrapTest.getLocalAddress()).getPort(), 1008);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!countingChannelUpstreamHandler.getBytesWritten().isEmpty());
        });
        launchTransportForBootStrapTest.stop();
        Assertions.assertThat(countingChannelUpstreamHandler.getBytesWritten()).containsOnly(new Integer[]{1008});
    }

    @Test
    public void transportDiscardsDataLargerRecvBufferSizeOnMacOsX() throws Exception {
        Assume.assumeTrue("Skipping test intended for MacOS X systems", SystemUtils.IS_OS_MAC_OSX);
        CountingChannelUpstreamHandler countingChannelUpstreamHandler = new CountingChannelUpstreamHandler();
        UdpTransport launchTransportForBootStrapTest = launchTransportForBootStrapTest(countingChannelUpstreamHandler);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(launchTransportForBootStrapTest.getLocalAddress() != null);
        });
        sendUdpDatagram(BIND_ADDRESS, ((InetSocketAddress) launchTransportForBootStrapTest.getLocalAddress()).getPort(), 2048);
        Uninterruptibles.sleepUninterruptibly(2L, TimeUnit.SECONDS);
        launchTransportForBootStrapTest.stop();
        Assertions.assertThat(countingChannelUpstreamHandler.getBytesWritten()).isEmpty();
    }

    @Test
    public void transportCanRecvLargeUDPPacketsOnLinux() throws Exception {
        Assume.assumeTrue("Skipping test intended for Linux systems", SystemUtils.IS_OS_LINUX);
        CountingChannelUpstreamHandler countingChannelUpstreamHandler = new CountingChannelUpstreamHandler();
        UdpTransport launchTransportForBootStrapTest = launchTransportForBootStrapTest(countingChannelUpstreamHandler);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(launchTransportForBootStrapTest.getLocalAddress() != null);
        });
        sendUdpDatagram(BIND_ADDRESS, ((InetSocketAddress) launchTransportForBootStrapTest.getLocalAddress()).getPort(), 65507);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!countingChannelUpstreamHandler.getBytesWritten().isEmpty());
        });
        launchTransportForBootStrapTest.stop();
        Assertions.assertThat(countingChannelUpstreamHandler.getBytesWritten()).containsExactly(new Integer[]{65507});
    }

    @Test
    public void receiveBufferSizeIsDefaultSize() {
        Assertions.assertThat(this.udpTransport.getBootstrap((MessageInput) Mockito.mock(MessageInput.class)).config().options().get(ChannelOption.SO_RCVBUF)).isEqualTo(Integer.valueOf(RECV_BUFFER_SIZE));
    }

    @Test
    public void receiveBufferSizeIsNotLimited() {
        int saturatedCast = Ints.saturatedCast(Size.megabytes(1L).toBytes());
        Assertions.assertThat(new UdpTransport(new Configuration(ImmutableMap.of("bind_address", BIND_ADDRESS, "port", Integer.valueOf(PORT), "recv_buffer_size", Integer.valueOf(saturatedCast))), this.eventLoopGroupFactory, this.nettyTransportConfiguration, this.throughputCounter, new LocalMetricRegistry()).getBootstrap((MessageInput) Mockito.mock(MessageInput.class)).config().options().get(ChannelOption.SO_RCVBUF)).isEqualTo(Integer.valueOf(saturatedCast));
    }

    @Test
    public void getMetricSetReturnsLocalMetricRegistry() {
        Assertions.assertThat(this.udpTransport.getMetricSet()).isSameAs(this.localMetricRegistry);
    }

    @Test
    public void testDefaultReceiveBufferSize() {
        Assertions.assertThat(new UdpTransport.Config().getRequestedConfiguration().getField("recv_buffer_size").getDefaultValue()).isEqualTo(262144);
    }

    @Test
    public void testTrafficCounter() throws Exception {
        CountingChannelUpstreamHandler countingChannelUpstreamHandler = new CountingChannelUpstreamHandler();
        UdpTransport launchTransportForBootStrapTest = launchTransportForBootStrapTest(countingChannelUpstreamHandler);
        try {
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(launchTransportForBootStrapTest.getLocalAddress() != null);
            });
            InetSocketAddress inetSocketAddress = (InetSocketAddress) launchTransportForBootStrapTest.getLocalAddress();
            sendUdpDatagram(BIND_ADDRESS, inetSocketAddress.getPort(), 512);
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(countingChannelUpstreamHandler.getBytesWritten().size() == 1);
            });
            Assertions.assertThat(countingChannelUpstreamHandler.getBytesWritten()).containsExactly(new Integer[]{512});
            sendUdpDatagram(BIND_ADDRESS, inetSocketAddress.getPort(), 512);
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(countingChannelUpstreamHandler.getBytesWritten().size() == 2);
            });
            Assertions.assertThat(countingChannelUpstreamHandler.getBytesWritten()).containsExactly(new Integer[]{512, 512});
            launchTransportForBootStrapTest.stop();
            Assertions.assertThat((Long) ((Gauge) this.throughputCounter.gauges().get("read_bytes_total")).getValue()).isEqualTo(1024L);
        } catch (Throwable th) {
            launchTransportForBootStrapTest.stop();
            throw th;
        }
    }

    private UdpTransport launchTransportForBootStrapTest(final ChannelInboundHandler channelInboundHandler) throws MisfireException {
        UdpTransport udpTransport = new UdpTransport(CONFIGURATION, this.eventLoopGroupFactory, this.nettyTransportConfiguration, this.throughputCounter, new LocalMetricRegistry()) { // from class: org.graylog2.inputs.transports.UdpTransportTest.1
            protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getChannelHandlers(MessageInput messageInput) {
                LinkedHashMap<String, Callable<? extends ChannelHandler>> linkedHashMap = new LinkedHashMap<>();
                linkedHashMap.put("logging", () -> {
                    return new LoggingHandler(LogLevel.INFO);
                });
                ChannelInboundHandler channelInboundHandler2 = channelInboundHandler;
                linkedHashMap.put("counter", () -> {
                    return channelInboundHandler2;
                });
                linkedHashMap.putAll(super.getChannelHandlers(messageInput));
                return linkedHashMap;
            }
        };
        MessageInput messageInput = (MessageInput) Mockito.mock(MessageInput.class);
        Mockito.when(messageInput.getId()).thenReturn("TEST");
        Mockito.when(messageInput.getName()).thenReturn("TEST");
        udpTransport.launch(messageInput);
        return udpTransport;
    }

    private void sendUdpDatagram(String str, int i, int i2) throws IOException {
        byte[] bArr = new byte[i2];
        java.net.DatagramPacket datagramPacket = new java.net.DatagramPacket(bArr, bArr.length, InetAddress.getByName(str), i);
        DatagramSocket datagramSocket = new DatagramSocket();
        try {
            datagramSocket.send(datagramPacket);
            datagramSocket.close();
        } catch (Throwable th) {
            try {
                datagramSocket.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
