package org.apache.flume.sink.kafka;

import com.google.common.base.Charsets;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.shared.kafka.test.KafkaPartitionTestUtil;
import org.apache.flume.shared.kafka.test.PartitionOption;
import org.apache.flume.shared.kafka.test.PartitionTestScenario;
import org.apache.flume.sink.kafka.util.TestUtil;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;

/* loaded from: input_file:org/apache/flume/sink/kafka/TestKafkaSink.class */
public class TestKafkaSink {
    private static final TestUtil testUtil = TestUtil.getInstance();
    private final Set<String> usedTopics = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flume.sink.kafka.TestKafkaSink$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flume/sink/kafka/TestKafkaSink$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flume$shared$kafka$test$PartitionOption = new int[PartitionOption.values().length];

        static {
            try {
                $SwitchMap$org$apache$flume$shared$kafka$test$PartitionOption[PartitionOption.VALIDBUTOUTOFRANGE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flume$shared$kafka$test$PartitionOption[PartitionOption.NOTSET.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flume$shared$kafka$test$PartitionOption[PartitionOption.NOTANUMBER.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @BeforeClass
    public static void setup() {
        testUtil.prepare();
        ArrayList arrayList = new ArrayList(3);
        arrayList.add("default-flume-topic");
        arrayList.add(TestConstants.STATIC_TOPIC);
        arrayList.add(TestConstants.CUSTOM_TOPIC);
        arrayList.add("test-avro-header-topic");
        testUtil.initTopicList(arrayList);
    }

    @AfterClass
    public static void tearDown() {
        testUtil.tearDown();
    }

    @Test
    public void testKafkaProperties() {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = new Context();
        context.put("kafka.kafka.topic", "");
        context.put("kafka.producer.value.serializer", "override.default.serializer");
        context.put("kafka.producer.fake.property", "kafka.property.value");
        context.put("kafka.bootstrap.servers", "localhost:9092,localhost:9092");
        context.put("brokerList", "real-broker-list");
        Configurables.configure(kafkaSink, context);
        Properties kafkaProps = kafkaSink.getKafkaProps();
        Assert.assertEquals(kafkaProps.getProperty("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
        Assert.assertEquals(kafkaProps.getProperty("value.serializer"), "override.default.serializer");
        Assert.assertEquals(kafkaProps.getProperty("fake.property"), "kafka.property.value");
        Assert.assertEquals(kafkaProps.getProperty("bootstrap.servers"), "localhost:9092,localhost:9092");
    }

    @Test
    public void testOldProperties() {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = new Context();
        context.put("topic", "test-topic");
        context.put("batchSize", "300");
        context.put("brokerList", "localhost:9092,localhost:9092");
        context.put("requiredAcks", "all");
        Configurables.configure(kafkaSink, context);
        Properties kafkaProps = kafkaSink.getKafkaProps();
        Assert.assertEquals(kafkaSink.getTopic(), "test-topic");
        Assert.assertEquals(kafkaSink.getBatchSize(), 300L);
        Assert.assertEquals(kafkaProps.getProperty("bootstrap.servers"), "localhost:9092,localhost:9092");
        Assert.assertEquals(kafkaProps.getProperty("acks"), "all");
    }

    @Test
    public void testDefaultTopic() {
        KafkaSink kafkaSink = new KafkaSink();
        Context prepareDefaultContext = prepareDefaultContext();
        Configurables.configure(kafkaSink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        memoryChannel.put(EventBuilder.withBody("default-topic-test".getBytes()));
        transaction.commit();
        transaction.close();
        try {
            if (kafkaSink.process() == Sink.Status.BACKOFF) {
                Assert.fail("Error Occurred");
            }
        } catch (EventDeliveryException e) {
        }
        checkMessageArrived("default-topic-test", "default-flume-topic");
    }

    private void checkMessageArrived(String str, String str2) {
        ConsumerRecords<String, String> pollConsumerRecords = pollConsumerRecords(str2);
        Assert.assertNotNull(pollConsumerRecords);
        Assert.assertTrue(pollConsumerRecords.count() > 0);
        Assert.assertEquals(str, ((ConsumerRecord) pollConsumerRecords.iterator().next()).value());
    }

    @Test
    public void testStaticTopic() {
        Context prepareDefaultContext = prepareDefaultContext();
        prepareDefaultContext.put("kafka.topic", TestConstants.STATIC_TOPIC);
        try {
            if (prepareAndSend(prepareDefaultContext, "static-topic-test") == Sink.Status.BACKOFF) {
                Assert.fail("Error Occurred");
            }
        } catch (EventDeliveryException e) {
        }
        checkMessageArrived("static-topic-test", TestConstants.STATIC_TOPIC);
    }

    @Test
    public void testTopicAndKeyFromHeader() {
        KafkaSink kafkaSink = new KafkaSink();
        Context prepareDefaultContext = prepareDefaultContext();
        Configurables.configure(kafkaSink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        HashMap hashMap = new HashMap();
        hashMap.put("topic", TestConstants.CUSTOM_TOPIC);
        hashMap.put("key", TestConstants.CUSTOM_KEY);
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        memoryChannel.put(EventBuilder.withBody("test-topic-and-key-from-header".getBytes(), hashMap));
        transaction.commit();
        transaction.close();
        try {
            if (kafkaSink.process() == Sink.Status.BACKOFF) {
                Assert.fail("Error Occurred");
            }
        } catch (EventDeliveryException e) {
        }
        checkMessageArrived("test-topic-and-key-from-header", TestConstants.CUSTOM_TOPIC);
    }

    @Test
    public void testTopicFromConfHeader() {
        KafkaSink kafkaSink = new KafkaSink();
        Context prepareDefaultContext = prepareDefaultContext();
        prepareDefaultContext.put("topicHeader", "customTopicHeader");
        Configurables.configure(kafkaSink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        HashMap hashMap = new HashMap();
        hashMap.put("customTopicHeader", TestConstants.CUSTOM_TOPIC);
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        memoryChannel.put(EventBuilder.withBody("test-topic-from-config-header".getBytes(), hashMap));
        transaction.commit();
        transaction.close();
        try {
            if (kafkaSink.process() == Sink.Status.BACKOFF) {
                Assert.fail("Error Occurred");
            }
        } catch (EventDeliveryException e) {
        }
        checkMessageArrived("test-topic-from-config-header", TestConstants.CUSTOM_TOPIC);
    }

    @Test
    public void testTopicNotFromConfHeader() {
        KafkaSink kafkaSink = new KafkaSink();
        Context prepareDefaultContext = prepareDefaultContext();
        prepareDefaultContext.put("allowTopicOverride", "false");
        prepareDefaultContext.put("topicHeader", "foo");
        Configurables.configure(kafkaSink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        HashMap hashMap = new HashMap();
        hashMap.put("topic", TestConstants.CUSTOM_TOPIC);
        hashMap.put("foo", TestConstants.CUSTOM_TOPIC);
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        memoryChannel.put(EventBuilder.withBody("test-topic-from-config-header".getBytes(), hashMap));
        transaction.commit();
        transaction.close();
        try {
            if (kafkaSink.process() == Sink.Status.BACKOFF) {
                Assert.fail("Error Occurred");
            }
        } catch (EventDeliveryException e) {
        }
        checkMessageArrived("test-topic-from-config-header", "default-flume-topic");
    }

    @Test
    public void testReplaceSubStringOfTopicWithHeaders() {
        KafkaSink kafkaSink = new KafkaSink();
        Context prepareDefaultContext = prepareDefaultContext();
        prepareDefaultContext.put("kafka.topic", TestConstants.HEADER_TOPIC);
        Configurables.configure(kafkaSink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        HashMap hashMap = new HashMap();
        hashMap.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        memoryChannel.put(EventBuilder.withBody("test-replace-substring-of-topic-with-headers".getBytes(), hashMap));
        transaction.commit();
        transaction.close();
        try {
            if (kafkaSink.process() == Sink.Status.BACKOFF) {
                Assert.fail("Error Occurred");
            }
        } catch (EventDeliveryException e) {
        }
        checkMessageArrived("test-replace-substring-of-topic-with-headers", "test-avro-header-topic");
    }

    @Test
    public void testAvroEvent() throws IOException {
        KafkaSink kafkaSink = new KafkaSink();
        Context prepareDefaultContext = prepareDefaultContext();
        prepareDefaultContext.put("useFlumeEventFormat", "true");
        Configurables.configure(kafkaSink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        HashMap hashMap = new HashMap();
        hashMap.put("topic", TestConstants.CUSTOM_TOPIC);
        hashMap.put("key", TestConstants.CUSTOM_KEY);
        hashMap.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        memoryChannel.put(EventBuilder.withBody("test-avro-event".getBytes(), hashMap));
        transaction.commit();
        transaction.close();
        try {
            if (kafkaSink.process() == Sink.Status.BACKOFF) {
                Assert.fail("Error Occurred");
            }
        } catch (EventDeliveryException e) {
        }
        ConsumerRecords<String, String> pollConsumerRecords = pollConsumerRecords(TestConstants.CUSTOM_TOPIC);
        Assert.assertNotNull(pollConsumerRecords);
        Assert.assertTrue(pollConsumerRecords.count() > 0);
        ConsumerRecord consumerRecord = (ConsumerRecord) pollConsumerRecords.iterator().next();
        AvroFlumeEvent avroFlumeEvent = (AvroFlumeEvent) new SpecificDatumReader(AvroFlumeEvent.class).read((Object) null, DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(((String) consumerRecord.value()).getBytes()), (BinaryDecoder) null));
        String str = new String(avroFlumeEvent.getBody().array(), Charsets.UTF_8);
        Map headers = avroFlumeEvent.getHeaders();
        Assert.assertEquals("test-avro-event", str);
        Assert.assertEquals(TestConstants.CUSTOM_KEY, consumerRecord.key());
        Assert.assertEquals(TestConstants.HEADER_1_VALUE, ((CharSequence) headers.get(new Utf8(TestConstants.HEADER_1_KEY))).toString());
        Assert.assertEquals(TestConstants.CUSTOM_KEY, ((CharSequence) headers.get(new Utf8("key"))).toString());
    }

    private ConsumerRecords<String, String> pollConsumerRecords(String str) {
        return pollConsumerRecords(str, 20);
    }

    private ConsumerRecords<String, String> pollConsumerRecords(String str, int i) {
        ConsumerRecords<String, String> consumerRecords = null;
        for (int i2 = 0; i2 < i; i2++) {
            consumerRecords = testUtil.getNextMessageFromConsumer(str);
            if (consumerRecords.count() > 0) {
                break;
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
        return consumerRecords;
    }

    @Test
    public void testEmptyChannel() throws EventDeliveryException {
        KafkaSink kafkaSink = new KafkaSink();
        Context prepareDefaultContext = prepareDefaultContext();
        Configurables.configure(kafkaSink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        if (kafkaSink.process() != Sink.Status.BACKOFF) {
            Assert.fail("Error Occurred");
        }
        Assert.assertNotNull(pollConsumerRecords("default-flume-topic", 2));
        Assert.assertEquals(r0.count(), 0L);
    }

    @Test
    public void testPartitionHeaderSet() throws Exception {
        doPartitionHeader(PartitionTestScenario.PARTITION_ID_HEADER_ONLY);
    }

    @Test
    public void testPartitionHeaderNotSet() throws Exception {
        doPartitionHeader(PartitionTestScenario.NO_PARTITION_HEADERS);
    }

    @Test
    public void testStaticPartitionAndHeaderSet() throws Exception {
        doPartitionHeader(PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID);
    }

    @Test
    public void testStaticPartitionHeaderNotSet() throws Exception {
        doPartitionHeader(PartitionTestScenario.STATIC_HEADER_ONLY);
    }

    @Test
    public void testPartitionHeaderMissing() throws Exception {
        doPartitionErrors(PartitionOption.NOTSET);
    }

    @Test
    public void testPartitionHeaderOutOfRange() throws Exception {
        KafkaSink kafkaSink = new KafkaSink();
        try {
            doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE, kafkaSink);
            Assert.fail();
        } catch (EventDeliveryException e) {
        }
        Assert.assertEquals(1L, ((SinkCounter) Whitebox.getInternalState(kafkaSink, "counter")).getEventWriteFail());
    }

    @Test(expected = EventDeliveryException.class)
    public void testPartitionHeaderInvalid() throws Exception {
        doPartitionErrors(PartitionOption.NOTANUMBER);
    }

    @Test
    public void testDefaultSettingsOnReConfigure() {
        Context prepareDefaultContext = prepareDefaultContext();
        prepareDefaultContext.put("kafka.producer.compression.type", "snappy");
        KafkaSink kafkaSink = new KafkaSink();
        Configurables.configure(kafkaSink, prepareDefaultContext);
        Assert.assertEquals("snappy", kafkaSink.getKafkaProps().getProperty("compression.type"));
        Configurables.configure(kafkaSink, prepareDefaultContext());
        Assert.assertNull(kafkaSink.getKafkaProps().getProperty("compression.type"));
    }

    private void doPartitionErrors(PartitionOption partitionOption) throws Exception {
        doPartitionErrors(partitionOption, new KafkaSink());
    }

    private void doPartitionErrors(PartitionOption partitionOption, Sink sink) throws Exception {
        Context prepareDefaultContext = prepareDefaultContext();
        prepareDefaultContext.put("partitionIdHeader", "partition-header");
        Configurables.configure(sink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        sink.setChannel(memoryChannel);
        sink.start();
        String findUnusedTopic = findUnusedTopic();
        createTopic(findUnusedTopic, 5);
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        HashMap hashMap = new HashMap();
        hashMap.put("topic", findUnusedTopic);
        switch (AnonymousClass1.$SwitchMap$org$apache$flume$shared$kafka$test$PartitionOption[partitionOption.ordinal()]) {
            case 1:
                hashMap.put("partition-header", "9");
                break;
            case 2:
                hashMap.put("wrong-header", "2");
                break;
            case 3:
                hashMap.put("partition-header", "not-a-number");
                break;
        }
        memoryChannel.put(EventBuilder.withBody(String.valueOf(9).getBytes(), hashMap));
        transaction.commit();
        transaction.close();
        Assert.assertEquals(Sink.Status.READY, sink.process());
        deleteTopic(findUnusedTopic);
    }

    private void doPartitionHeader(PartitionTestScenario partitionTestScenario) throws Exception {
        Integer num = 3;
        String findUnusedTopic = findUnusedTopic();
        createTopic(findUnusedTopic, 5);
        Context prepareDefaultContext = prepareDefaultContext();
        prepareDefaultContext.put("flumeBatchSize", "100");
        if (partitionTestScenario == PartitionTestScenario.PARTITION_ID_HEADER_ONLY || partitionTestScenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID) {
            prepareDefaultContext.put("partitionIdHeader", "partition-header");
        }
        if (partitionTestScenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID || partitionTestScenario == PartitionTestScenario.STATIC_HEADER_ONLY) {
            prepareDefaultContext.put("defaultPartitionId", num.toString());
        }
        KafkaSink kafkaSink = new KafkaSink();
        Configurables.configure(kafkaSink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        HashMap hashMap = new HashMap(5);
        for (int i = 0; i < 5; i++) {
            hashMap.put(Integer.valueOf(i), new ArrayList());
        }
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        for (Event event : KafkaPartitionTestUtil.generateSkewedMessageList(partitionTestScenario, 50, hashMap, 5, num.intValue())) {
            event.getHeaders().put("topic", findUnusedTopic);
            memoryChannel.put(event);
        }
        transaction.commit();
        transaction.close();
        Assert.assertEquals(Sink.Status.READY, kafkaSink.process());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", testUtil.getKafkaServerUrl());
        properties.put("group.id", "group_1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        KafkaPartitionTestUtil.checkResultsAgainstSkew(partitionTestScenario, hashMap, KafkaPartitionTestUtil.retrieveRecordsFromPartitions(findUnusedTopic, 5, properties), num.intValue(), 50);
        memoryChannel.stop();
        kafkaSink.stop();
        deleteTopic(findUnusedTopic);
    }

    private Context prepareDefaultContext() {
        Context context = new Context();
        context.put("kafka.bootstrap.servers", testUtil.getKafkaServerUrl());
        context.put("flumeBatchSize", "1");
        return context;
    }

    private Sink.Status prepareAndSend(Context context, String str) throws EventDeliveryException {
        KafkaSink kafkaSink = new KafkaSink();
        Configurables.configure(kafkaSink, context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, context);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        memoryChannel.put(EventBuilder.withBody(str.getBytes()));
        transaction.commit();
        transaction.close();
        return kafkaSink.process();
    }

    private void createTopic(String str, int i) {
        testUtil.createTopics(Collections.singletonList(str), i);
    }

    private void deleteTopic(String str) {
        testUtil.deleteTopic(str);
    }

    private String findUnusedTopic() {
        String str = null;
        boolean z = false;
        while (!z) {
            str = RandomStringUtils.randomAlphabetic(8);
            if (!this.usedTopics.contains(str)) {
                this.usedTopics.add(str);
                z = true;
            }
        }
        return str;
    }
}
