package org.graylog2.shared.journal;

import com.codahale.metrics.MetricRegistry;
import com.github.joschi.jadconfig.util.Size;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import com.google.common.primitives.Ints;
import java.io.File;
import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.IntStream;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.lang.RandomStringUtils;
import org.assertj.core.api.Assertions;
import org.graylog.shaded.kafka09.common.KafkaException;
import org.graylog.shaded.kafka09.log.LogSegment;
import org.graylog.shaded.kafka09.message.Message;
import org.graylog.shaded.kafka09.message.MessageSet;
import org.graylog.shaded.kafka09.utils.FileLock;
import org.graylog2.Configuration;
import org.graylog2.audit.NullAuditEventSender;
import org.graylog2.plugin.InstantMillisProvider;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.plugin.system.FilePersistedNodeIdProvider;
import org.graylog2.shared.journal.Journal;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

/* loaded from: input_file:org/graylog2/shared/journal/LocalKafkaJournalTest.class */
public class LocalKafkaJournalTest {

    @Rule
    public final MockitoRule mockitoRule = MockitoJUnit.rule();

    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    private ServerStatus serverStatus;
    private ScheduledThreadPoolExecutor scheduler;
    private File journalDirectory;

    @Before
    public void setUp() throws IOException {
        this.scheduler = new ScheduledThreadPoolExecutor(1);
        this.scheduler.prestartCoreThread();
        this.journalDirectory = this.temporaryFolder.newFolder();
        final File newFile = this.temporaryFolder.newFile("node-id");
        Files.write(newFile.toPath(), UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
        this.serverStatus = new ServerStatus(new Configuration() { // from class: org.graylog2.shared.journal.LocalKafkaJournalTest.1
            public String getNodeIdFile() {
                return newFile.getAbsolutePath();
            }
        }, EnumSet.of(ServerStatus.Capability.SERVER), new EventBus("KafkaJournalTest"), NullAuditEventSender::new, new FilePersistedNodeIdProvider(newFile.getAbsolutePath()).get());
    }

    @After
    public void tearDown() {
        this.scheduler.shutdown();
    }

    @Test
    public void writeAndRead() throws IOException {
        LocalKafkaJournal localKafkaJournal = new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, Size.megabytes(100L), Duration.standardHours(1L), Size.megabytes(5L), Duration.standardHours(1L), 1000000L, Duration.standardMinutes(1L), 100, new MetricRegistry(), this.serverStatus);
        localKafkaJournal.write("id".getBytes(StandardCharsets.UTF_8), "message".getBytes(StandardCharsets.UTF_8));
        Assert.assertEquals("message", new String(((Journal.JournalReadEntry) Iterators.getOnlyElement(localKafkaJournal.read(1L).iterator())).getPayload(), StandardCharsets.UTF_8));
    }

    @Test
    public void readAtLeastOne() throws Exception {
        LocalKafkaJournal localKafkaJournal = new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, Size.megabytes(100L), Duration.standardHours(1L), Size.megabytes(5L), Duration.standardHours(1L), 1000000L, Duration.standardMinutes(1L), 100, new MetricRegistry(), this.serverStatus);
        localKafkaJournal.write("id".getBytes(StandardCharsets.UTF_8), "message1".getBytes(StandardCharsets.UTF_8));
        Assert.assertEquals("message1", new String(((Journal.JournalReadEntry) Iterators.getOnlyElement(localKafkaJournal.read(0L).iterator())).getPayload(), StandardCharsets.UTF_8));
    }

    private int createBulkChunks(LocalKafkaJournal localKafkaJournal, Size size, int i) {
        int saturatedCast = Ints.saturatedCast(size.toBytes() / 32);
        for (int i2 = 0; i2 < i; i2++) {
            ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(saturatedCast);
            long j = 0;
            for (int i3 = 0; i3 < saturatedCast; i3++) {
                byte[] bytes = ("id" + i3).getBytes(StandardCharsets.UTF_8);
                byte[] bytes2 = ("message " + i3).getBytes(StandardCharsets.UTF_8);
                j += 3 * (bytes.length + bytes2.length);
                if (j > size.toBytes()) {
                    break;
                }
                newArrayListWithExpectedSize.add(localKafkaJournal.createEntry(bytes, bytes2));
            }
            localKafkaJournal.write(newArrayListWithExpectedSize);
        }
        return saturatedCast;
    }

    private int countSegmentsInDir(File file) {
        return file.list(FileFilterUtils.and(new IOFileFilter[]{FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter(".log")})).length;
    }

    @Test
    public void maxSegmentSize() throws Exception {
        Size kilobytes = Size.kilobytes(1L);
        LocalKafkaJournal localKafkaJournal = new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, kilobytes, Duration.standardHours(1L), Size.kilobytes(10L), Duration.standardDays(1L), 1000000L, Duration.standardMinutes(1L), 100, new MetricRegistry(), this.serverStatus);
        long j = 0;
        long bytes = kilobytes.toBytes();
        ArrayList newArrayList = Lists.newArrayList();
        while (j <= bytes) {
            j += r0.length + r0.length;
            newArrayList.add(localKafkaJournal.createEntry("the1-id".getBytes(StandardCharsets.UTF_8), "the1-message".getBytes(StandardCharsets.UTF_8)));
        }
        Assertions.assertThat(localKafkaJournal.write(newArrayList)).isEqualTo(newArrayList.size() - 1);
    }

    @Test
    public void maxMessageSize() throws Exception {
        Size kilobytes = Size.kilobytes(1L);
        LocalKafkaJournal localKafkaJournal = new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, kilobytes, Duration.standardHours(1L), Size.kilobytes(10L), Duration.standardDays(1L), 1000000L, Duration.standardMinutes(1L), 100, new MetricRegistry(), this.serverStatus);
        long j = 0;
        long bytes = kilobytes.toBytes();
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(localKafkaJournal.createEntry(RandomStringUtils.randomAlphanumeric(6).getBytes(StandardCharsets.UTF_8), RandomStringUtils.randomAlphanumeric(Ints.saturatedCast(kilobytes.toBytes() * 2)).getBytes(StandardCharsets.UTF_8)));
        newArrayList.add(localKafkaJournal.createEntry(RandomStringUtils.randomAlphanumeric(6).getBytes(StandardCharsets.UTF_8), RandomStringUtils.randomAlphanumeric(Ints.saturatedCast(((kilobytes.toBytes() - MessageSet.LogOverhead()) - Message.MessageOverhead()) - r0.length)).getBytes(StandardCharsets.UTF_8)));
        while (j <= bytes) {
            j += r0.length + r0.length;
            newArrayList.add(localKafkaJournal.createEntry(RandomStringUtils.randomAlphanumeric(6).getBytes(StandardCharsets.UTF_8), "the-message".getBytes(StandardCharsets.UTF_8)));
        }
        Assertions.assertThat(localKafkaJournal.write(newArrayList)).isEqualTo(newArrayList.size() - 2);
    }

    @Test
    public void segmentRotation() throws Exception {
        Size kilobytes = Size.kilobytes(1L);
        createBulkChunks(new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, kilobytes, Duration.standardHours(1L), Size.kilobytes(10L), Duration.standardDays(1L), 1000000L, Duration.standardMinutes(1L), 100, new MetricRegistry(), this.serverStatus), kilobytes, 3);
        File[] listFiles = this.journalDirectory.listFiles();
        Assert.assertNotNull(listFiles);
        Assert.assertTrue("there should be files in the journal directory", listFiles.length > 0);
        Assert.assertTrue(this.journalDirectory.listFiles((FileFilter) FileFilterUtils.and(new IOFileFilter[]{FileFilterUtils.directoryFileFilter(), FileFilterUtils.nameFileFilter("messagejournal-0")})).length == 1);
        Assert.assertEquals("should have two journal segments", 3L, r0[0].listFiles((FileFilter) FileFilterUtils.and(new IOFileFilter[]{FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter(".log")})).length);
    }

    @Test
    public void segmentSizeCleanup() throws Exception {
        Size kilobytes = Size.kilobytes(1L);
        LocalKafkaJournal localKafkaJournal = new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, kilobytes, Duration.standardHours(1L), Size.kilobytes(1L), Duration.standardDays(1L), 1000000L, Duration.standardMinutes(1L), 100, new MetricRegistry(), this.serverStatus);
        Assert.assertTrue(new File(this.journalDirectory, "messagejournal-0").exists());
        createBulkChunks(localKafkaJournal, kilobytes, 3);
        localKafkaJournal.flushDirtyLogs();
        Assert.assertEquals(3L, countSegmentsInDir(r0));
        Assert.assertEquals(1L, localKafkaJournal.cleanupLogs());
        Assert.assertEquals(2L, countSegmentsInDir(r0));
    }

    @Test
    public void segmentAgeCleanup() throws Exception {
        InstantMillisProvider instantMillisProvider = new InstantMillisProvider(DateTime.now(DateTimeZone.UTC));
        DateTimeUtils.setCurrentMillisProvider(instantMillisProvider);
        try {
            Size kilobytes = Size.kilobytes(1L);
            LocalKafkaJournal localKafkaJournal = new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, kilobytes, Duration.standardHours(1L), Size.kilobytes(10L), Duration.standardMinutes(1L), 1000000L, Duration.standardMinutes(1L), 100, new MetricRegistry(), this.serverStatus);
            Assert.assertTrue(new File(this.journalDirectory, "messagejournal-0").exists());
            createBulkChunks(localKafkaJournal, kilobytes, 1);
            localKafkaJournal.flushDirtyLogs();
            instantMillisProvider.tick(Period.seconds(30));
            createBulkChunks(localKafkaJournal, kilobytes, 1);
            localKafkaJournal.flushDirtyLogs();
            long[] jArr = {instantMillisProvider.getMillis(), instantMillisProvider.getMillis()};
            int i = 0;
            for (LogSegment logSegment : localKafkaJournal.getSegments()) {
                Assert.assertTrue(i < 2);
                logSegment.lastModified_$eq(jArr[i]);
                i++;
            }
            Assert.assertEquals("no segments should've been cleaned", 0L, localKafkaJournal.cleanupLogs());
            Assert.assertEquals("two segments segment should remain", 2L, countSegmentsInDir(r0));
            instantMillisProvider.tick(Period.seconds(120));
            Assert.assertEquals("two segments should've been cleaned (only one will actually be removed...)", 2L, localKafkaJournal.cleanupLogs());
            Assert.assertEquals("one segment should remain", 1L, countSegmentsInDir(r0));
        } finally {
            DateTimeUtils.setCurrentMillisSystem();
        }
    }

    @Test
    public void segmentCommittedCleanup() throws Exception {
        Size kilobytes = Size.kilobytes(1L);
        LocalKafkaJournal localKafkaJournal = new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, kilobytes, Duration.standardHours(1L), Size.petabytes(1L), Duration.standardDays(1L), 1000000L, Duration.standardMinutes(1L), 100, new MetricRegistry(), this.serverStatus);
        Assert.assertTrue(new File(this.journalDirectory, "messagejournal-0").exists());
        int createBulkChunks = createBulkChunks(localKafkaJournal, kilobytes, 3);
        localKafkaJournal.flushDirtyLogs();
        Assert.assertEquals(3L, countSegmentsInDir(r0));
        Assert.assertEquals(0L, localKafkaJournal.cleanupLogs());
        Assert.assertEquals(3L, countSegmentsInDir(r0));
        localKafkaJournal.markJournalOffsetCommitted(createBulkChunks / 2);
        Assert.assertEquals("should not touch segments", 0L, localKafkaJournal.cleanupLogs());
        Assert.assertEquals(3L, countSegmentsInDir(r0));
        localKafkaJournal.markJournalOffsetCommitted(createBulkChunks + 1);
        Assert.assertEquals("first segment should've been purged", 1L, localKafkaJournal.cleanupLogs());
        Assert.assertEquals(2L, countSegmentsInDir(r0));
        localKafkaJournal.markJournalOffsetCommitted(createBulkChunks * 4);
        Assert.assertEquals("only purge one segment, not the active one", 1L, localKafkaJournal.cleanupLogs());
        Assert.assertEquals(1L, countSegmentsInDir(r0));
    }

    @Test
    public void lockedJournalDir() throws Exception {
        File file = new File(this.journalDirectory, ".lock");
        Assume.assumeTrue(file.createNewFile());
        Assume.assumeTrue(new FileLock(file).tryLock());
        try {
            new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, Size.megabytes(100L), Duration.standardHours(1L), Size.megabytes(5L), Duration.standardHours(1L), 1000000L, Duration.standardMinutes(1L), 100, new MetricRegistry(), this.serverStatus);
            Assert.fail("Expected exception");
        } catch (Exception e) {
            Assertions.assertThat(e).isExactlyInstanceOf(RuntimeException.class).hasMessageStartingWith("org.graylog.shaded.kafka09.common.KafkaException: Failed to acquire lock on file .lock in").hasCauseExactlyInstanceOf(KafkaException.class);
        }
    }

    @Test
    public void serverStatusThrottledIfJournalUtilizationIsHigherThanThreshold() throws Exception {
        this.serverStatus.running();
        Size kilobytes = Size.kilobytes(1L);
        LocalKafkaJournal localKafkaJournal = new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, kilobytes, Duration.standardSeconds(1L), Size.kilobytes(4L), Duration.standardHours(1L), 1000000L, Duration.standardSeconds(1L), 90, new MetricRegistry(), this.serverStatus);
        createBulkChunks(localKafkaJournal, kilobytes, 4);
        localKafkaJournal.flushDirtyLogs();
        localKafkaJournal.cleanupLogs();
        Assertions.assertThat(this.serverStatus.getLifecycle()).isEqualTo(Lifecycle.THROTTLED);
    }

    @Test
    public void serverStatusUnthrottledIfJournalUtilizationIsLowerThanThreshold() throws Exception {
        this.serverStatus.throttle();
        LocalKafkaJournal localKafkaJournal = new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, Size.kilobytes(1L), Duration.standardSeconds(1L), Size.kilobytes(4L), Duration.standardHours(1L), 1000000L, Duration.standardSeconds(1L), 90, new MetricRegistry(), this.serverStatus);
        localKafkaJournal.flushDirtyLogs();
        localKafkaJournal.cleanupLogs();
        Assertions.assertThat(this.serverStatus.getLifecycle()).isEqualTo(Lifecycle.RUNNING);
    }

    @Test
    public void truncatedSegment() throws Exception {
        Size kilobytes = Size.kilobytes(1L);
        LocalKafkaJournal localKafkaJournal = new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, kilobytes, Duration.standardHours(1L), Size.kilobytes(10L), Duration.standardDays(1L), 1000000L, Duration.standardMinutes(1L), 100, new MetricRegistry(), this.serverStatus);
        createBulkChunks(localKafkaJournal, kilobytes, 2);
        Path path = Paths.get(this.journalDirectory.getAbsolutePath(), "messagejournal-0", "00000000000000000000.log");
        Assertions.assertThat(path).isRegularFile();
        File file = path.toFile();
        FileChannel channel = new FileOutputStream(file, true).getChannel();
        try {
            channel.truncate(file.length() - 1);
            if (channel != null) {
                channel.close();
            }
            Assertions.assertThat(localKafkaJournal.read(25L)).hasSize(24);
            Assertions.assertThat(localKafkaJournal.read(25L)).hasSize(25);
        } catch (Throwable th) {
            if (channel != null) {
                try {
                    channel.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @Ignore
    public void readNext() throws Exception {
        LocalKafkaJournal localKafkaJournal = new LocalKafkaJournal(this.journalDirectory.toPath(), this.scheduler, Size.kilobytes(1L), Duration.standardHours(1L), Size.kilobytes(10L), Duration.standardDays(1L), 1000000L, Duration.standardMinutes(1L), 100, new MetricRegistry(), this.serverStatus);
        byte[] bytes = "id".getBytes(StandardCharsets.UTF_8);
        byte[] bytes2 = "message".getBytes(StandardCharsets.UTF_8);
        IntStream.range(0, 1000000).parallel().forEach(i -> {
            if (i % 1000 == 0) {
                localKafkaJournal.write(bytes, bytes2);
            } else {
                localKafkaJournal.read(1L).forEach(journalReadEntry -> {
                    localKafkaJournal.markJournalOffsetCommitted(journalReadEntry.getOffset());
                });
            }
        });
    }
}
