package org.apache.hadoop.fs.azurebfs;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsNot;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.class */
public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
    private static final int BASE_SIZE = 1024;
    private static final int ONE_THOUSAND = 1000;
    private static final int TEST_BUFFER_SIZE = 3072000;
    private static final int ONE_MB = 1048576;
    private static final int FLUSH_TIMES = 200;
    private static final int THREAD_SLEEP_TIME = 1000;
    private static final int TEST_FILE_LENGTH = 8388608;
    private static final int WAITING_TIME = 1000;

    @Test
    public void testAbfsOutputStreamAsyncFlushWithRetainUncommittedData() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(this.methodName.getMethodName());
        FSDataOutputStream create = fileSystem.create(path);
        Throwable th = null;
        try {
            try {
                byte[] bArr = new byte[TEST_BUFFER_SIZE];
                new Random().nextBytes(bArr);
                for (int i = 0; i < 2; i++) {
                    create.write(bArr);
                    for (int i2 = 0; i2 < FLUSH_TIMES; i2++) {
                        create.flush();
                        Thread.sleep(10L);
                    }
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                byte[] bArr2 = new byte[TEST_BUFFER_SIZE];
                FSDataInputStream open = fileSystem.open(path, 4194304);
                Throwable th3 = null;
                while (open.available() != 0) {
                    try {
                        try {
                            assertNotEquals("read returned -1", -1L, open.read(bArr2));
                            assertArrayEquals("buffer read from stream", bArr2, bArr);
                        } catch (Throwable th4) {
                            th3 = th4;
                            throw th4;
                        }
                    } catch (Throwable th5) {
                        if (open != null) {
                            if (th3 != null) {
                                try {
                                    open.close();
                                } catch (Throwable th6) {
                                    th3.addSuppressed(th6);
                                }
                            } else {
                                open.close();
                            }
                        }
                        throw th5;
                    }
                }
                if (open != null) {
                    if (0 == 0) {
                        open.close();
                        return;
                    }
                    try {
                        open.close();
                    } catch (Throwable th7) {
                        th3.addSuppressed(th7);
                    }
                }
            } catch (Throwable th8) {
                th = th8;
                throw th8;
            }
        } catch (Throwable th9) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th10) {
                        th.addSuppressed(th10);
                    }
                } else {
                    create.close();
                }
            }
            throw th9;
        }
    }

    @Test
    public void testAbfsOutputStreamSyncFlush() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(this.methodName.getMethodName());
        FSDataOutputStream create = fileSystem.create(path);
        Throwable th = null;
        try {
            byte[] bArr = new byte[TEST_BUFFER_SIZE];
            new Random().nextBytes(bArr);
            create.write(bArr);
            for (int i = 0; i < FLUSH_TIMES; i++) {
                create.hsync();
                create.hflush();
                Thread.sleep(10L);
            }
            byte[] bArr2 = new byte[TEST_BUFFER_SIZE];
            FSDataInputStream open = fileSystem.open(path, 4194304);
            Throwable th2 = null;
            try {
                assertNotEquals(-1L, open.read(bArr2));
                assertArrayEquals(bArr2, bArr);
                if (open != null) {
                    if (0 == 0) {
                        open.close();
                        return;
                    }
                    try {
                        open.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        open.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    @Test
    public void testWriteHeavyBytesToFileSyncFlush() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = path(this.methodName.getMethodName());
        final FSDataOutputStream create = fileSystem.create(path);
        Throwable th = null;
        try {
            try {
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
                final byte[] bArr = new byte[TEST_BUFFER_SIZE];
                new Random().nextBytes(bArr);
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < FLUSH_TIMES; i++) {
                    arrayList.add(newFixedThreadPool.submit(new Callable<Void>() { // from class: org.apache.hadoop.fs.azurebfs.ITestAzureBlobFileSystemFlush.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public Void call() throws Exception {
                            create.write(bArr);
                            return null;
                        }
                    }));
                }
                boolean z = false;
                while (!z) {
                    z = true;
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        if (!((Future) it.next()).isDone()) {
                            create.hsync();
                            z = false;
                            Thread.sleep(1000L);
                        }
                    }
                }
                arrayList.clear();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                newFixedThreadPool.shutdownNow();
                assertEquals("Wrong file length in " + path, 614400000L, fileSystem.getFileStatus(path).getLen());
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWriteHeavyBytesToFileAsyncFlush() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        Path path = path(this.methodName.getMethodName());
        final FSDataOutputStream create = fileSystem.create(path);
        Throwable th = null;
        try {
            try {
                final byte[] bArr = new byte[TEST_BUFFER_SIZE];
                new Random().nextBytes(bArr);
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < FLUSH_TIMES; i++) {
                    arrayList.add(newFixedThreadPool.submit(new Callable<Void>() { // from class: org.apache.hadoop.fs.azurebfs.ITestAzureBlobFileSystemFlush.2
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public Void call() throws Exception {
                            create.write(bArr);
                            return null;
                        }
                    }));
                }
                boolean z = false;
                while (!z) {
                    z = true;
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        if (!((Future) it.next()).isDone()) {
                            create.flush();
                            z = false;
                        }
                    }
                }
                Thread.sleep(1000L);
                arrayList.clear();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                newFixedThreadPool.shutdownNow();
                assertEquals(614400000L, fileSystem.getFileStatus(path).getLen());
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFlushWithOutputStreamFlushEnabled() throws Exception {
        testFlush(false);
    }

    @Test
    public void testFlushWithOutputStreamFlushDisabled() throws Exception {
        testFlush(true);
    }

    private void testFlush(boolean z) throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        fileSystem.getAbfsStore().getAbfsConfiguration().setDisableOutputStreamFlush(z);
        Path path = path(this.methodName.getMethodName());
        byte[] randomBytesArray = getRandomBytesArray();
        assertTrue(fileSystem.getAbfsStore().getAbfsConfiguration().getWriteBufferSize() <= randomBytesArray.length);
        boolean z2 = true;
        if (!fileSystem.getAbfsStore().isAppendBlobKey(fileSystem.makeQualified(path).toString())) {
            z2 = false;
        }
        FSDataOutputStream create = fileSystem.create(path);
        Throwable th = null;
        try {
            try {
                create.write(randomBytesArray);
                create.getWrappedStream().waitForPendingUploads();
                create.flush();
                validate(fileSystem.open(path), randomBytesArray, !z || z2);
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testHflushWithFlushEnabled() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        byte[] randomBytesArray = getRandomBytesArray();
        Path path = path(UUID.randomUUID().toString());
        FSDataOutputStream streamAfterWrite = getStreamAfterWrite(fileSystem, path, randomBytesArray, true);
        Throwable th = null;
        try {
            try {
                streamAfterWrite.hflush();
                validate(fileSystem, path, randomBytesArray, true);
                if (streamAfterWrite != null) {
                    if (0 == 0) {
                        streamAfterWrite.close();
                        return;
                    }
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (streamAfterWrite != null) {
                if (th != null) {
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    streamAfterWrite.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testHflushWithFlushDisabled() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        byte[] randomBytesArray = getRandomBytesArray();
        Path path = path(this.methodName.getMethodName());
        boolean z = false;
        if (fileSystem.getAbfsStore().isAppendBlobKey(fileSystem.makeQualified(path).toString())) {
            z = true;
        }
        FSDataOutputStream streamAfterWrite = getStreamAfterWrite(fileSystem, path, randomBytesArray, false);
        Throwable th = null;
        try {
            try {
                streamAfterWrite.hflush();
                validate(fileSystem, path, randomBytesArray, z);
                if (streamAfterWrite != null) {
                    if (0 == 0) {
                        streamAfterWrite.close();
                        return;
                    }
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (streamAfterWrite != null) {
                if (th != null) {
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    streamAfterWrite.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testHsyncWithFlushEnabled() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        byte[] randomBytesArray = getRandomBytesArray();
        Path path = path(this.methodName.getMethodName());
        FSDataOutputStream streamAfterWrite = getStreamAfterWrite(fileSystem, path, randomBytesArray, true);
        Throwable th = null;
        try {
            try {
                streamAfterWrite.hsync();
                validate(fileSystem, path, randomBytesArray, true);
                if (streamAfterWrite != null) {
                    if (0 == 0) {
                        streamAfterWrite.close();
                        return;
                    }
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (streamAfterWrite != null) {
                if (th != null) {
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    streamAfterWrite.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTracingHeaderForAppendBlob() throws Exception {
        Configuration configuration = new Configuration(getRawConfiguration());
        configuration.set("fs.azure.appendblob.directories", "abfss:/");
        configuration.set(TestConfigurationKeys.FS_AZURE_TEST_APPENDBLOB_ENABLED, "true");
        AzureBlobFileSystem newInstance = FileSystem.newInstance(configuration);
        byte[] bArr = new byte[10];
        new Random().nextBytes(bArr);
        FSDataOutputStream create = newInstance.create(new Path("/testFile"));
        Throwable th = null;
        try {
            create.getWrappedStream().registerListener(new TracingHeaderValidator(newInstance.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), newInstance.getFileSystemId(), FSOperationType.WRITE, false, 0, create.getWrappedStream().getStreamID()));
            create.write(bArr);
            create.hsync();
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
        FSDataOutputStream streamAfterWrite = getStreamAfterWrite(getFileSystem(), path(this.methodName.getMethodName()), getRandomBytesArray(), false);
        Throwable th = null;
        try {
            try {
                ContractTestUtils.assertLacksStreamCapabilities(streamAfterWrite, new String[]{"hflush", "hsync", "dropbehind", "in:readahead", "in:unbuffer"});
                if (streamAfterWrite != null) {
                    if (0 == 0) {
                        streamAfterWrite.close();
                        return;
                    }
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (streamAfterWrite != null) {
                if (th != null) {
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    streamAfterWrite.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
        FSDataOutputStream streamAfterWrite = getStreamAfterWrite(getFileSystem(), path(this.methodName.getMethodName()), getRandomBytesArray(), true);
        Throwable th = null;
        try {
            try {
                ContractTestUtils.assertHasStreamCapabilities(streamAfterWrite, new String[]{"hflush", "hsync"});
                ContractTestUtils.assertLacksStreamCapabilities(streamAfterWrite, new String[]{"dropbehind", "in:readahead", "in:unbuffer"});
                if (streamAfterWrite != null) {
                    if (0 == 0) {
                        streamAfterWrite.close();
                        return;
                    }
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (streamAfterWrite != null) {
                if (th != null) {
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    streamAfterWrite.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testHsyncWithFlushDisabled() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        byte[] randomBytesArray = getRandomBytesArray();
        Path path = path(this.methodName.getMethodName());
        boolean z = false;
        if (fileSystem.getAbfsStore().isAppendBlobKey(fileSystem.makeQualified(path).toString())) {
            z = true;
        }
        FSDataOutputStream streamAfterWrite = getStreamAfterWrite(fileSystem, path, randomBytesArray, false);
        Throwable th = null;
        try {
            try {
                streamAfterWrite.hsync();
                validate(fileSystem, path, randomBytesArray, z);
                if (streamAfterWrite != null) {
                    if (0 == 0) {
                        streamAfterWrite.close();
                        return;
                    }
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (streamAfterWrite != null) {
                if (th != null) {
                    try {
                        streamAfterWrite.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    streamAfterWrite.close();
                }
            }
            throw th4;
        }
    }

    private byte[] getRandomBytesArray() {
        byte[] bArr = new byte[TEST_FILE_LENGTH];
        new Random().nextBytes(bArr);
        return bArr;
    }

    private FSDataOutputStream getStreamAfterWrite(AzureBlobFileSystem azureBlobFileSystem, Path path, byte[] bArr, boolean z) throws IOException {
        azureBlobFileSystem.getAbfsStore().getAbfsConfiguration().setEnableFlush(z);
        FSDataOutputStream create = azureBlobFileSystem.create(path);
        create.write(bArr);
        return create;
    }

    private void validate(InputStream inputStream, byte[] bArr, boolean z) throws IOException {
        try {
            byte[] bArr2 = new byte[bArr.length];
            inputStream.read(bArr2, 0, bArr2.length);
            if (z) {
                assertArrayEquals("Bytes read do not match bytes written.", bArr, bArr2);
            } else {
                assertThat("Bytes read unexpectedly match bytes written.", bArr2, IsNot.not(IsEqual.equalTo(bArr)));
            }
        } finally {
            inputStream.close();
        }
    }

    private void validate(FileSystem fileSystem, Path path, byte[] bArr, boolean z) throws IOException {
        String uri = path.toUri().toString();
        FSDataInputStream open = fileSystem.open(path);
        Throwable th = null;
        try {
            byte[] bArr2 = new byte[TEST_FILE_LENGTH];
            open.read(bArr2, 0, bArr2.length);
            if (z) {
                assertArrayEquals(String.format("Bytes read do not match bytes written to %1$s", uri), bArr, bArr2);
            } else {
                assertThat(String.format("Bytes read unexpectedly match bytes written to %1$s", uri), bArr2, IsNot.not(IsEqual.equalTo(bArr)));
            }
            if (open != null) {
                if (0 == 0) {
                    open.close();
                    return;
                }
                try {
                    open.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }
}
