package org.apache.hadoop.hdfs.qjournal.client;

import com.google.common.base.Supplier;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.class */
public class TestIPCLoggerChannel {
    private static final String JID = "test-journalid";
    private IPCLoggerChannel ch;
    private static final int LIMIT_QUEUE_SIZE_MB = 1;
    private static final int LIMIT_QUEUE_SIZE_BYTES = 1048576;
    private static final Log LOG = LogFactory.getLog(TestIPCLoggerChannel.class);
    private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(12345, "mycluster", "my-bp", 0);
    private static final InetSocketAddress FAKE_ADDR = new InetSocketAddress(0);
    private static final byte[] FAKE_DATA = new byte[4096];
    private final Configuration conf = new Configuration();
    private final QJournalProtocol mockProxy = (QJournalProtocol) Mockito.mock(QJournalProtocol.class);

    @Before
    public void setupMock() {
        this.conf.setInt("dfs.qjournal.queued-edits.limit.mb", LIMIT_QUEUE_SIZE_MB);
        this.ch = new IPCLoggerChannel(this.conf, FAKE_NSINFO, JID, FAKE_ADDR) { // from class: org.apache.hadoop.hdfs.qjournal.client.TestIPCLoggerChannel.1
            protected QJournalProtocol getProxy() throws IOException {
                return TestIPCLoggerChannel.this.mockProxy;
            }
        };
        this.ch.setEpoch(1L);
    }

    @Test
    public void testSimpleCall() throws Exception {
        this.ch.sendEdits(1L, 1L, 3, FAKE_DATA).get();
        ((QJournalProtocol) Mockito.verify(this.mockProxy)).journal((RequestInfo) Mockito.any(), Mockito.eq(1L), Mockito.eq(1L), Mockito.eq(3), (byte[]) Mockito.same(FAKE_DATA));
    }

    @Test
    public void testQueueLimiting() throws Exception {
        GenericTestUtils.DelayAnswer delayAnswer = new GenericTestUtils.DelayAnswer(LOG);
        ((QJournalProtocol) Mockito.doAnswer(delayAnswer).when(this.mockProxy)).journal((RequestInfo) Mockito.any(), Mockito.eq(1L), Mockito.eq(1L), Mockito.eq(LIMIT_QUEUE_SIZE_MB), (byte[]) Mockito.same(FAKE_DATA));
        int length = LIMIT_QUEUE_SIZE_BYTES / FAKE_DATA.length;
        for (int i = LIMIT_QUEUE_SIZE_MB; i <= length; i += LIMIT_QUEUE_SIZE_MB) {
            this.ch.sendEdits(1L, i, LIMIT_QUEUE_SIZE_MB, FAKE_DATA);
        }
        Assert.assertEquals(DiskBalancerTestUtil.MB, this.ch.getQueuedEditsSize());
        try {
            this.ch.sendEdits(1L, length + LIMIT_QUEUE_SIZE_MB, LIMIT_QUEUE_SIZE_MB, FAKE_DATA).get(1L, TimeUnit.SECONDS);
            Assert.fail("Did not fail to queue more calls after queue was full");
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof LoggerTooFarBehindException)) {
                throw e;
            }
        }
        delayAnswer.proceed();
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.qjournal.client.TestIPCLoggerChannel.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Boolean m236get() {
                return Boolean.valueOf(TestIPCLoggerChannel.this.ch.getQueuedEditsSize() == 0);
            }
        }, 10, 1000);
    }

    @Test
    public void testStopSendingEditsWhenOutOfSync() throws Exception {
        ((QJournalProtocol) Mockito.doThrow(new IOException("injected error")).when(this.mockProxy)).journal((RequestInfo) Mockito.any(), Mockito.eq(1L), Mockito.eq(1L), Mockito.eq(LIMIT_QUEUE_SIZE_MB), (byte[]) Mockito.same(FAKE_DATA));
        try {
            this.ch.sendEdits(1L, 1L, LIMIT_QUEUE_SIZE_MB, FAKE_DATA).get();
            Assert.fail("Injected JOOSE did not cause sendEdits() to throw");
        } catch (ExecutionException e) {
            GenericTestUtils.assertExceptionContains("injected", e);
        }
        ((QJournalProtocol) Mockito.verify(this.mockProxy)).journal((RequestInfo) Mockito.any(), Mockito.eq(1L), Mockito.eq(1L), Mockito.eq(LIMIT_QUEUE_SIZE_MB), (byte[]) Mockito.same(FAKE_DATA));
        Assert.assertTrue(this.ch.isOutOfSync());
        try {
            this.ch.sendEdits(1L, 2L, LIMIT_QUEUE_SIZE_MB, FAKE_DATA).get();
            Assert.fail("sendEdits() should throw until next roll");
        } catch (ExecutionException e2) {
            GenericTestUtils.assertExceptionContains("disabled until next roll", e2.getCause());
        }
        ((QJournalProtocol) Mockito.verify(this.mockProxy, Mockito.never())).journal((RequestInfo) Mockito.any(), Mockito.eq(1L), Mockito.eq(2L), Mockito.eq(LIMIT_QUEUE_SIZE_MB), (byte[]) Mockito.same(FAKE_DATA));
        ((QJournalProtocol) Mockito.verify(this.mockProxy)).heartbeat((RequestInfo) Mockito.any());
        this.ch.startLogSegment(3L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        Assert.assertFalse(this.ch.isOutOfSync());
        this.ch.sendEdits(3L, 3L, LIMIT_QUEUE_SIZE_MB, FAKE_DATA).get();
    }
}
