package org.apache.distributedlog.client;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.twitter.finagle.IndividualRequestTimeoutException;
import com.twitter.util.Await;
import com.twitter.util.Future;
import com.twitter.util.Promise;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecordSet;
import org.apache.distributedlog.LogRecordSetBuffer;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.io.CompressionCodec;
import org.apache.distributedlog.service.DistributedLogClient;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.class */
public class TestDistributedLogMultiStreamWriter {
    @Test(timeout = 20000, expected = IllegalArgumentException.class)
    public void testBuildWithNullStreams() throws Exception {
        DistributedLogMultiStreamWriter.newBuilder().build();
    }

    @Test(timeout = 20000, expected = IllegalArgumentException.class)
    public void testBuildWithEmptyStreamList() throws Exception {
        DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList()).build();
    }

    @Test(timeout = 20000, expected = NullPointerException.class)
    public void testBuildWithNullClient() throws Exception {
        DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).build();
    }

    @Test(timeout = 20000, expected = NullPointerException.class)
    public void testBuildWithNullCodec() throws Exception {
        DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client((DistributedLogClient) Mockito.mock(DistributedLogClient.class)).compressionCodec((CompressionCodec.Type) null).build();
    }

    @Test(timeout = 20000, expected = IllegalArgumentException.class)
    public void testBuildWithInvalidSpeculativeSettings1() throws Exception {
        DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client((DistributedLogClient) Mockito.mock(DistributedLogClient.class)).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(-1).build();
    }

    @Test(timeout = 20000, expected = IllegalArgumentException.class)
    public void testBuildWithInvalidSpeculativeSettings2() throws Exception {
        DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client((DistributedLogClient) Mockito.mock(DistributedLogClient.class)).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(10).maxSpeculativeTimeoutMs(5).build();
    }

    @Test(timeout = 20000, expected = IllegalArgumentException.class)
    public void testBuildWithInvalidSpeculativeSettings3() throws Exception {
        DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client((DistributedLogClient) Mockito.mock(DistributedLogClient.class)).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(10).maxSpeculativeTimeoutMs(20).speculativeBackoffMultiplier(-1.0f).build();
    }

    @Test(timeout = 20000, expected = IllegalArgumentException.class)
    public void testBuildWithInvalidSpeculativeSettings4() throws Exception {
        DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client((DistributedLogClient) Mockito.mock(DistributedLogClient.class)).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(10).maxSpeculativeTimeoutMs(20).speculativeBackoffMultiplier(2.0f).requestTimeoutMs(10L).build();
    }

    @Test(timeout = 20000)
    public void testBuildMultiStreamWriter() throws Exception {
        DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client((DistributedLogClient) Mockito.mock(DistributedLogClient.class)).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(10).maxSpeculativeTimeoutMs(20).speculativeBackoffMultiplier(2.0f).requestTimeoutMs(50L).build();
        Assert.assertTrue(true);
    }

    @Test(timeout = 20000)
    public void testBuildWithPeriodicalFlushEnabled() throws Exception {
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
        ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService, Mockito.times(1))).scheduleAtFixedRate(DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client((DistributedLogClient) Mockito.mock(DistributedLogClient.class)).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(10).maxSpeculativeTimeoutMs(20).speculativeBackoffMultiplier(2.0f).requestTimeoutMs(50L).flushIntervalMs(1000).scheduler(scheduledExecutorService).build(), 1000000L, 1000000L, TimeUnit.MICROSECONDS);
    }

    @Test(timeout = 20000)
    public void testBuildWithPeriodicalFlushDisabled() throws Exception {
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
        Runnable build = DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client((DistributedLogClient) Mockito.mock(DistributedLogClient.class)).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(10).maxSpeculativeTimeoutMs(20).speculativeBackoffMultiplier(2.0f).requestTimeoutMs(50L).flushIntervalMs(0).scheduler(scheduledExecutorService).build();
        ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService, Mockito.times(0))).scheduleAtFixedRate(build, 1000L, 1000L, TimeUnit.MILLISECONDS);
        build.close();
    }

    @Test(timeout = 20000)
    public void testFlushWhenBufferIsFull() throws Exception {
        DistributedLogClient distributedLogClient = (DistributedLogClient) Mockito.mock(DistributedLogClient.class);
        Mockito.when(distributedLogClient.writeRecordSet((String) Mockito.any(), (LogRecordSetBuffer) Mockito.any())).thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
        DistributedLogMultiStreamWriter build = DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client(distributedLogClient).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(100000).maxSpeculativeTimeoutMs(200000).speculativeBackoffMultiplier(2.0f).requestTimeoutMs(500000L).flushIntervalMs(0).bufferSize(0).scheduler(Executors.newSingleThreadScheduledExecutor()).build();
        build.write(ByteBuffer.wrap("test".getBytes(Charsets.UTF_8)));
        ((DistributedLogClient) Mockito.verify(distributedLogClient, Mockito.times(1))).writeRecordSet((String) Mockito.any(), (LogRecordSetBuffer) Mockito.any());
        build.close();
    }

    @Test(timeout = 20000)
    public void testFlushWhenExceedMaxLogRecordSetSize() throws Exception {
        DistributedLogClient distributedLogClient = (DistributedLogClient) Mockito.mock(DistributedLogClient.class);
        Mockito.when(distributedLogClient.writeRecordSet((String) Mockito.any(), (LogRecordSetBuffer) Mockito.any())).thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
        DistributedLogMultiStreamWriter build = DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client(distributedLogClient).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(100000).maxSpeculativeTimeoutMs(200000).speculativeBackoffMultiplier(2.0f).requestTimeoutMs(500000L).flushIntervalMs(0).bufferSize(Integer.MAX_VALUE).scheduler(Executors.newSingleThreadScheduledExecutor()).build();
        byte[] bArr = new byte[1040084];
        build.write(ByteBuffer.wrap(bArr));
        ((DistributedLogClient) Mockito.verify(distributedLogClient, Mockito.times(0))).writeRecordSet((String) Mockito.any(), (LogRecordSetBuffer) Mockito.any());
        LogRecordSet.Writer logRecordSetWriter = build.getLogRecordSetWriter();
        Assert.assertEquals(1L, logRecordSetWriter.getNumRecords());
        Assert.assertEquals(20 + bArr.length, logRecordSetWriter.getNumBytes());
        build.write(ByteBuffer.wrap(bArr));
        ((DistributedLogClient) Mockito.verify(distributedLogClient, Mockito.times(1))).writeRecordSet((String) Mockito.any(), (LogRecordSetBuffer) Mockito.any());
        LogRecordSet.Writer logRecordSetWriter2 = build.getLogRecordSetWriter();
        Assert.assertEquals(1L, logRecordSetWriter2.getNumRecords());
        Assert.assertEquals(20 + bArr.length, logRecordSetWriter2.getNumBytes());
        Assert.assertTrue(logRecordSetWriter != logRecordSetWriter2);
        build.close();
    }

    @Test(timeout = 20000)
    public void testWriteTooLargeRecord() throws Exception {
        DistributedLogMultiStreamWriter build = DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client((DistributedLogClient) Mockito.mock(DistributedLogClient.class)).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(100000).maxSpeculativeTimeoutMs(200000).speculativeBackoffMultiplier(2.0f).requestTimeoutMs(5000000L).flushIntervalMs(0).bufferSize(0).build();
        Future write = build.write(ByteBuffer.wrap(new byte[1040394]));
        Assert.assertTrue(write.isDefined());
        try {
            Await.result(write);
            Assert.fail("Should fail on writing too long record");
        } catch (LogRecordTooLongException e) {
        }
        build.close();
    }

    @Test(timeout = 20000)
    public void testSpeculativeWrite() throws Exception {
        DistributedLogClient distributedLogClient = (DistributedLogClient) Mockito.mock(DistributedLogClient.class);
        DistributedLogMultiStreamWriter build = DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client(distributedLogClient).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(10).maxSpeculativeTimeoutMs(20).speculativeBackoffMultiplier(2.0f).requestTimeoutMs(5000000L).flushIntervalMs(0).bufferSize(0).build();
        final String stream = build.getStream(1);
        final DLSN dlsn = new DLSN(99L, 88L, 0L);
        ((DistributedLogClient) Mockito.doAnswer(new Answer() { // from class: org.apache.distributedlog.client.TestDistributedLogMultiStreamWriter.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                return ((String) invocationOnMock.getArguments()[0]).equals(stream) ? Future.value(dlsn) : new Promise();
            }
        }).when(distributedLogClient)).writeRecordSet((String) Mockito.any(), (LogRecordSetBuffer) Mockito.any());
        Assert.assertEquals(dlsn, (DLSN) Await.result(build.write(ByteBuffer.wrap("test-test".getBytes(Charsets.UTF_8)))));
        build.close();
    }

    @Test(timeout = 20000)
    public void testPeriodicalFlush() throws Exception {
        DistributedLogClient distributedLogClient = (DistributedLogClient) Mockito.mock(DistributedLogClient.class);
        DistributedLogMultiStreamWriter build = DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client(distributedLogClient).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(10).maxSpeculativeTimeoutMs(20).speculativeBackoffMultiplier(2.0f).requestTimeoutMs(5000000L).flushIntervalMs(10).bufferSize(Integer.MAX_VALUE).build();
        final DLSN dlsn = new DLSN(99L, 88L, 0L);
        ((DistributedLogClient) Mockito.doAnswer(new Answer() { // from class: org.apache.distributedlog.client.TestDistributedLogMultiStreamWriter.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                return Future.value(dlsn);
            }
        }).when(distributedLogClient)).writeRecordSet((String) Mockito.any(), (LogRecordSetBuffer) Mockito.any());
        Assert.assertEquals(dlsn, (DLSN) Await.result(build.write(ByteBuffer.wrap("test-test".getBytes(Charsets.UTF_8)))));
        build.close();
    }

    @Test(timeout = 20000)
    public void testFailRequestAfterRetriedAllStreams() throws Exception {
        DistributedLogClient distributedLogClient = (DistributedLogClient) Mockito.mock(DistributedLogClient.class);
        Mockito.when(distributedLogClient.writeRecordSet((String) Mockito.any(), (LogRecordSetBuffer) Mockito.any())).thenReturn(new Promise());
        DistributedLogMultiStreamWriter build = DistributedLogMultiStreamWriter.newBuilder().streams(Lists.newArrayList(new String[]{"stream1", "stream2"})).client(distributedLogClient).compressionCodec(CompressionCodec.Type.LZ4).firstSpeculativeTimeoutMs(10).maxSpeculativeTimeoutMs(20).speculativeBackoffMultiplier(2.0f).requestTimeoutMs(5000000L).flushIntervalMs(10).bufferSize(Integer.MAX_VALUE).build();
        try {
            Await.result(build.write(ByteBuffer.wrap("test-test".getBytes(Charsets.UTF_8))));
            Assert.fail("Should fail the request after retries all streams");
        } catch (IndividualRequestTimeoutException e) {
            long inMilliseconds = e.timeout().inMilliseconds();
            Assert.assertTrue(inMilliseconds >= 30 && inMilliseconds < 5000000);
        }
        build.close();
    }
}
