package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.OptionalInt;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/ProducerBatchTest.class */
public class ProducerBatchTest {
    private final long now = 1488748346917L;
    private final MemoryRecordsBuilder memoryRecordsBuilder = MemoryRecords.builder(ByteBuffer.allocate(512), CompressionType.NONE, TimestampType.CREATE_TIME, 128);

    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/ProducerBatchTest$MockCallback.class */
    private static class MockCallback implements Callback {
        private int invocations;
        private RecordMetadata metadata;
        private Exception exception;

        private MockCallback() {
            this.invocations = 0;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            this.invocations++;
            this.metadata = recordMetadata;
            this.exception = exc;
        }
    }

    @Test
    public void testBatchAbort() throws Exception {
        ProducerBatch producerBatch = new ProducerBatch(new TopicPartition("topic", 1), this.memoryRecordsBuilder, 1488748346917L);
        MockCallback mockCallback = new MockCallback();
        FutureRecordMetadata tryAppend = producerBatch.tryAppend(1488748346917L, (byte[]) null, new byte[10], Record.EMPTY_HEADERS, mockCallback, 1488748346917L);
        KafkaException kafkaException = new KafkaException();
        producerBatch.abort(kafkaException);
        Assertions.assertTrue(tryAppend.isDone());
        Assertions.assertEquals(1, mockCallback.invocations);
        Assertions.assertEquals(kafkaException, mockCallback.exception);
        Assertions.assertNull(mockCallback.metadata);
        Assertions.assertFalse(producerBatch.complete(500L, 2342342341L));
        Assertions.assertFalse(producerBatch.completeExceptionally(new KafkaException(), num -> {
            return new KafkaException();
        }));
        Assertions.assertEquals(1, mockCallback.invocations);
        Assertions.assertTrue(tryAppend.isDone());
        try {
            tryAppend.get();
            Assertions.fail("Future should have thrown");
        } catch (ExecutionException e) {
            Assertions.assertEquals(kafkaException, e.getCause());
        }
    }

    @Test
    public void testBatchCannotAbortTwice() throws Exception {
        ProducerBatch producerBatch = new ProducerBatch(new TopicPartition("topic", 1), this.memoryRecordsBuilder, 1488748346917L);
        MockCallback mockCallback = new MockCallback();
        FutureRecordMetadata tryAppend = producerBatch.tryAppend(1488748346917L, (byte[]) null, new byte[10], Record.EMPTY_HEADERS, mockCallback, 1488748346917L);
        KafkaException kafkaException = new KafkaException();
        producerBatch.abort(kafkaException);
        Assertions.assertEquals(1, mockCallback.invocations);
        Assertions.assertEquals(kafkaException, mockCallback.exception);
        Assertions.assertNull(mockCallback.metadata);
        try {
            producerBatch.abort(new KafkaException());
            Assertions.fail("Expected exception from abort");
        } catch (IllegalStateException e) {
        }
        Assertions.assertEquals(1, mockCallback.invocations);
        Assertions.assertTrue(tryAppend.isDone());
        try {
            tryAppend.get();
            Assertions.fail("Future should have thrown");
        } catch (ExecutionException e2) {
            Assertions.assertEquals(kafkaException, e2.getCause());
        }
    }

    @Test
    public void testBatchCannotCompleteTwice() throws Exception {
        ProducerBatch producerBatch = new ProducerBatch(new TopicPartition("topic", 1), this.memoryRecordsBuilder, 1488748346917L);
        MockCallback mockCallback = new MockCallback();
        FutureRecordMetadata tryAppend = producerBatch.tryAppend(1488748346917L, (byte[]) null, new byte[10], Record.EMPTY_HEADERS, mockCallback, 1488748346917L);
        producerBatch.complete(500L, 10L);
        Assertions.assertEquals(1, mockCallback.invocations);
        Assertions.assertNull(mockCallback.exception);
        Assertions.assertNotNull(mockCallback.metadata);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            producerBatch.complete(1000L, 20L);
        });
        RecordMetadata recordMetadata = tryAppend.get();
        Assertions.assertEquals(500L, recordMetadata.offset());
        Assertions.assertEquals(10L, recordMetadata.timestamp());
    }

    @Test
    public void testSplitPreservesHeaders() {
        for (CompressionType compressionType : CompressionType.values()) {
            ProducerBatch producerBatch = new ProducerBatch(new TopicPartition("topic", 1), MemoryRecords.builder(ByteBuffer.allocate(1024), (byte) 2, compressionType, TimestampType.CREATE_TIME, 0L), 1488748346917L);
            do {
            } while (producerBatch.tryAppend(1488748346917L, "hi".getBytes(), "there".getBytes(), new Header[]{new RecordHeader("header-key", "header-value".getBytes())}, (Callback) null, 1488748346917L) != null);
            Deque split = producerBatch.split(200);
            Assertions.assertTrue(split.size() >= 2, "This batch should be split to multiple small batches.");
            Iterator it = split.iterator();
            while (it.hasNext()) {
                Iterator it2 = ((ProducerBatch) it.next()).records().batches().iterator();
                while (it2.hasNext()) {
                    for (Record record : (RecordBatch) it2.next()) {
                        Assertions.assertEquals(1, record.headers().length, "Header size should be 1.");
                        Assertions.assertEquals("header-key", record.headers()[0].key(), "Header key should be 'header-key'.");
                        Assertions.assertEquals("header-value", new String(record.headers()[0].value()), "Header value should be 'header-value'.");
                    }
                }
            }
        }
    }

    @Test
    public void testSplitPreservesMagicAndCompressionType() {
        Iterator it = Arrays.asList((byte) 0, (byte) 1, (byte) 2).iterator();
        while (it.hasNext()) {
            byte byteValue = ((Byte) it.next()).byteValue();
            for (CompressionType compressionType : CompressionType.values()) {
                if ((compressionType != CompressionType.NONE || byteValue >= 2) && (compressionType != CompressionType.ZSTD || byteValue >= 2)) {
                    ProducerBatch producerBatch = new ProducerBatch(new TopicPartition("topic", 1), MemoryRecords.builder(ByteBuffer.allocate(1024), byteValue, compressionType, TimestampType.CREATE_TIME, 0L), 1488748346917L);
                    do {
                    } while (producerBatch.tryAppend(1488748346917L, "hi".getBytes(), "there".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1488748346917L) != null);
                    Deque<ProducerBatch> split = producerBatch.split(512);
                    Assertions.assertTrue(split.size() >= 2);
                    for (ProducerBatch producerBatch2 : split) {
                        Assertions.assertEquals(byteValue, producerBatch2.magic());
                        Assertions.assertTrue(producerBatch2.isSplitBatch());
                        for (RecordBatch recordBatch : producerBatch2.records().batches()) {
                            Assertions.assertEquals(byteValue, recordBatch.magic());
                            Assertions.assertEquals(0L, recordBatch.baseOffset());
                            Assertions.assertEquals(compressionType, recordBatch.compressionType());
                        }
                    }
                }
            }
        }
    }

    @Test
    public void testBatchExpiration() {
        ProducerBatch producerBatch = new ProducerBatch(new TopicPartition("topic", 1), this.memoryRecordsBuilder, 1488748346917L);
        Assertions.assertFalse(producerBatch.hasReachedDeliveryTimeout(10240L, 1488748346915L));
        Assertions.assertTrue(producerBatch.hasReachedDeliveryTimeout(10240L, 1488748346917L + 10240));
    }

    @Test
    public void testBatchExpirationAfterReenqueue() {
        ProducerBatch producerBatch = new ProducerBatch(new TopicPartition("topic", 1), this.memoryRecordsBuilder, 1488748346917L);
        producerBatch.reenqueued(1488748346917L);
        Assertions.assertFalse(producerBatch.hasReachedDeliveryTimeout(10240L, 1488748346915L));
    }

    @Test
    public void testShouldNotAttemptAppendOnceRecordsBuilderIsClosedForAppends() {
        ProducerBatch producerBatch = new ProducerBatch(new TopicPartition("topic", 1), this.memoryRecordsBuilder, 1488748346917L);
        Assertions.assertNotNull(producerBatch.tryAppend(1488748346917L, (byte[]) null, new byte[10], Record.EMPTY_HEADERS, (Callback) null, 1488748346917L));
        Assertions.assertTrue(this.memoryRecordsBuilder.hasRoomFor(1488748346917L, (byte[]) null, new byte[10], Record.EMPTY_HEADERS));
        this.memoryRecordsBuilder.closeForRecordAppends();
        Assertions.assertFalse(this.memoryRecordsBuilder.hasRoomFor(1488748346917L, (byte[]) null, new byte[10], Record.EMPTY_HEADERS));
        Assertions.assertNull(producerBatch.tryAppend(1488748346918L, (byte[]) null, new byte[10], Record.EMPTY_HEADERS, (Callback) null, 1488748346918L));
    }

    @Test
    public void testCompleteExceptionallyWithRecordErrors() {
        RuntimeException runtimeException = new RuntimeException();
        HashMap hashMap = new HashMap();
        hashMap.put(0, new RuntimeException());
        hashMap.put(3, new RuntimeException());
        testCompleteExceptionally(5, runtimeException, num -> {
            return (RuntimeException) hashMap.getOrDefault(num, runtimeException);
        });
    }

    @Test
    public void testCompleteExceptionallyWithNullRecordErrors() {
        int i = 5;
        RuntimeException runtimeException = new RuntimeException();
        Assertions.assertThrows(NullPointerException.class, () -> {
            testCompleteExceptionally(i, runtimeException, null);
        });
    }

    @Test
    public void testWithLeaderChangesAcrossRetries() {
        ProducerBatch producerBatch = new ProducerBatch(new TopicPartition("topic", 1), this.memoryRecordsBuilder, 1488748346917L);
        Assertions.assertEquals(OptionalInt.empty(), producerBatch.currentLeaderEpoch());
        Assertions.assertEquals(0, producerBatch.attemptsWhenLeaderLastChanged());
        producerBatch.maybeUpdateLeaderEpoch(OptionalInt.empty());
        Assertions.assertFalse(producerBatch.hasLeaderChangedForTheOngoingRetry());
        producerBatch.maybeUpdateLeaderEpoch(OptionalInt.of(100));
        Assertions.assertFalse(producerBatch.hasLeaderChangedForTheOngoingRetry(), "batch leader is assigned for 1st time");
        Assertions.assertEquals(100, producerBatch.currentLeaderEpoch().getAsInt());
        Assertions.assertEquals(0, producerBatch.attemptsWhenLeaderLastChanged());
        producerBatch.reenqueued(0L);
        producerBatch.maybeUpdateLeaderEpoch(OptionalInt.of(101));
        Assertions.assertTrue(producerBatch.hasLeaderChangedForTheOngoingRetry(), "batch leader has changed");
        Assertions.assertEquals(101, producerBatch.currentLeaderEpoch().getAsInt());
        Assertions.assertEquals(1, producerBatch.attemptsWhenLeaderLastChanged());
        producerBatch.maybeUpdateLeaderEpoch(OptionalInt.of(101));
        Assertions.assertTrue(producerBatch.hasLeaderChangedForTheOngoingRetry(), "batch leader has changed");
        Assertions.assertEquals(101, producerBatch.currentLeaderEpoch().getAsInt());
        Assertions.assertEquals(1, producerBatch.attemptsWhenLeaderLastChanged());
        producerBatch.reenqueued(0L);
        producerBatch.maybeUpdateLeaderEpoch(OptionalInt.of(101));
        Assertions.assertFalse(producerBatch.hasLeaderChangedForTheOngoingRetry(), "batch leader has not changed");
        Assertions.assertEquals(101, producerBatch.currentLeaderEpoch().getAsInt());
        Assertions.assertEquals(1, producerBatch.attemptsWhenLeaderLastChanged());
        producerBatch.maybeUpdateLeaderEpoch(OptionalInt.of(101 - 1));
        Assertions.assertFalse(producerBatch.hasLeaderChangedForTheOngoingRetry(), "batch leader has not changed");
        Assertions.assertEquals(101, producerBatch.currentLeaderEpoch().getAsInt());
        Assertions.assertEquals(1, producerBatch.attemptsWhenLeaderLastChanged());
        producerBatch.maybeUpdateLeaderEpoch(OptionalInt.empty());
        Assertions.assertFalse(producerBatch.hasLeaderChangedForTheOngoingRetry(), "batch leader has not changed");
        Assertions.assertEquals(101, producerBatch.currentLeaderEpoch().getAsInt());
        Assertions.assertEquals(1, producerBatch.attemptsWhenLeaderLastChanged());
    }

    private void testCompleteExceptionally(int i, RuntimeException runtimeException, Function<Integer, RuntimeException> function) {
        ProducerBatch producerBatch = new ProducerBatch(new TopicPartition("topic", 1), this.memoryRecordsBuilder, 1488748346917L);
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(producerBatch.tryAppend(1488748346917L, (byte[]) null, new byte[10], Record.EMPTY_HEADERS, (Callback) null, 1488748346917L));
        }
        Assertions.assertEquals(i, producerBatch.recordCount);
        producerBatch.completeExceptionally(runtimeException, function);
        Assertions.assertTrue(producerBatch.isDone());
        for (int i3 = 0; i3 < arrayList.size(); i3++) {
            Assertions.assertEquals(function.apply(Integer.valueOf(i3)), (RuntimeException) TestUtils.assertFutureThrows((FutureRecordMetadata) arrayList.get(i3), RuntimeException.class));
        }
    }
}
