package com.azure.core.util;

import com.azure.core.CoreTestUtils;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.policy.FixedDelayOptions;
import com.azure.core.http.policy.RetryOptions;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.mocking.MockAsynchronousFileChannel;
import com.azure.core.util.mocking.MockFileChannel;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channels;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLockInterruptionException;
import java.nio.channels.NonWritableChannelException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.util.context.Context;

/* loaded from: input_file:com/azure/core/util/FluxUtilTest.class */
public class FluxUtilTest {
    @Test
    public void testCallWithContextGetSingle() {
        StepVerifier.create(getSingle().contextWrite(Context.of("FirstName", "Foo", "LastName", "Bar"))).assertNext(str -> {
            Assertions.assertEquals("Hello, Foo Bar", str);
        }).verifyComplete();
    }

    @Test
    public void testCallWithContextGetCollection() {
        StepVerifier.create(getCollection().contextWrite(Context.of("FirstName", "Foo", "LastName", "Bar"))).assertNext(str -> {
            Assertions.assertEquals("Hello,", str);
        }).assertNext(str2 -> {
            Assertions.assertEquals("Foo", str2);
        }).assertNext(str3 -> {
            Assertions.assertEquals("Bar", str3);
        }).verifyComplete();
    }

    @Test
    public void testCallWithDefaultContextGetSingle() {
        StepVerifier.create(getSingleWithContextAttributes().contextWrite(Context.of("FirstName", "Foo"))).assertNext(str -> {
            Assertions.assertEquals("Hello, Foo additionalContextValue", str);
        }).verifyComplete();
    }

    @Test
    public void toReactorContextNull() {
        Assertions.assertTrue(FluxUtil.toReactorContext((Context) null).isEmpty());
    }

    @Test
    public void toReactorContextContextNone() {
        Assertions.assertTrue(FluxUtil.toReactorContext(Context.NONE).isEmpty());
    }

    @Test
    public void toReactorContextCleansesNullValues() {
        Assertions.assertTrue(FluxUtil.toReactorContext(new Context("key", (Object) null)).isEmpty());
    }

    @Test
    public void toReactorContext() {
        Context context = new Context("key1", "value1");
        Context reactorContext = FluxUtil.toReactorContext(context);
        Assertions.assertEquals(1, reactorContext.size());
        Assertions.assertTrue(reactorContext.hasKey("key1"));
        Assertions.assertEquals("value1", reactorContext.get("key1"));
        Context reactorContext2 = FluxUtil.toReactorContext(context.addData("key2", "value2").addData("key1", "value3"));
        Assertions.assertEquals(2, reactorContext2.size());
        Assertions.assertTrue(reactorContext2.hasKey("key1"));
        Assertions.assertEquals("value3", reactorContext2.get("key1"));
        Assertions.assertTrue(reactorContext2.hasKey("key2"));
        Assertions.assertEquals("value2", reactorContext2.get("key2"));
    }

    @Test
    public void testIsFluxByteBufferInvalidType() {
        Assertions.assertFalse(FluxUtil.isFluxByteBuffer(Mono.class));
    }

    @Test
    public void testIsFluxByteBufferValidType() throws Exception {
        Assertions.assertTrue(FluxUtil.isFluxByteBuffer(FluxUtilTest.class.getMethod("mockReturnType", new Class[0]).getGenericReturnType()));
    }

    @Test
    public void testToMono() {
        String str = "some value";
        StepVerifier.create(FluxUtil.toMono(new SimpleResponse(new HttpRequest(HttpMethod.GET, "http://www.test.com"), 202, new HttpHeaders(), "some value"))).assertNext(str2 -> {
            Assertions.assertEquals(str2, str);
        }).verifyComplete();
    }

    @Test
    public void testMonoError() {
        StepVerifier.create(FluxUtil.monoError(new ClientLogger(FluxUtilTest.class), new RuntimeException("It is an error message"))).verifyErrorMessage("It is an error message");
    }

    @Test
    public void testFluxError() {
        StepVerifier.create(FluxUtil.fluxError(new ClientLogger(FluxUtilTest.class), new RuntimeException("It is an error message"))).verifyErrorMessage("It is an error message");
    }

    @Test
    public void testPageFluxError() {
        StepVerifier.create(FluxUtil.pagedFluxError(new ClientLogger(FluxUtilTest.class), new RuntimeException("It is an error message"))).verifyErrorMessage("It is an error message");
    }

    @Test
    public void testWriteFile() throws Exception {
        byte[] bytes = "test".getBytes(StandardCharsets.UTF_8);
        Flux just = Flux.just(new ByteBuffer[]{ByteBuffer.wrap(bytes, 0, 2), ByteBuffer.wrap(bytes, 2, 2)});
        File createFileIfNotExist = createFileIfNotExist();
        FileOutputStream fileOutputStream = new FileOutputStream(createFileIfNotExist);
        try {
            fileOutputStream.write("hello there".getBytes(StandardCharsets.UTF_8));
            fileOutputStream.close();
            AsynchronousFileChannel open = AsynchronousFileChannel.open(createFileIfNotExist.toPath(), StandardOpenOption.WRITE);
            try {
                FluxUtil.writeFile(just, open).block();
                CoreTestUtils.assertArraysEqual(Files.readAllBytes(createFileIfNotExist.toPath()), "testo there".getBytes(StandardCharsets.UTF_8));
                if (open != null) {
                    open.close();
                }
            } catch (Throwable th) {
                if (open != null) {
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            try {
                fileOutputStream.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    @Test
    public void testWriteFileWithPosition() throws Exception {
        byte[] bytes = "test".getBytes(StandardCharsets.UTF_8);
        Flux just = Flux.just(new ByteBuffer[]{ByteBuffer.wrap(bytes, 0, 2), ByteBuffer.wrap(bytes, 2, 2)});
        File createFileIfNotExist = createFileIfNotExist();
        FileOutputStream fileOutputStream = new FileOutputStream(createFileIfNotExist);
        try {
            fileOutputStream.write("hello there".getBytes(StandardCharsets.UTF_8));
            fileOutputStream.close();
            AsynchronousFileChannel open = AsynchronousFileChannel.open(createFileIfNotExist.toPath(), StandardOpenOption.WRITE);
            try {
                FluxUtil.writeFile(just, open, 6L).block();
                CoreTestUtils.assertArraysEqual(Files.readAllBytes(createFileIfNotExist.toPath()), "hello teste".getBytes(StandardCharsets.UTF_8));
                if (open != null) {
                    open.close();
                }
            } catch (Throwable th) {
                if (open != null) {
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            try {
                fileOutputStream.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    @Test
    public void testWriteWritableChannel() {
        byte[] bytes = "test".getBytes(StandardCharsets.UTF_8);
        Flux just = Flux.just(new ByteBuffer[]{ByteBuffer.wrap(bytes, 0, 2), ByteBuffer.wrap(bytes, 2, 2)});
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        FluxUtil.writeToWritableByteChannel(just, Channels.newChannel(byteArrayOutputStream)).block();
        CoreTestUtils.assertArraysEqual(byteArrayOutputStream.toByteArray(), bytes);
    }

    @MethodSource({"writeFileDoesNotSwallowErrorSupplier"})
    @ParameterizedTest
    public void writeFileDoesNotSwallowError(Flux<ByteBuffer> flux, AsynchronousFileChannel asynchronousFileChannel, Class<? extends Throwable> cls) {
        StepVerifier.create(Flux.using(() -> {
            return asynchronousFileChannel;
        }, asynchronousFileChannel2 -> {
            return FluxUtil.writeFile(flux, asynchronousFileChannel2);
        }, asynchronousFileChannel3 -> {
            try {
                asynchronousFileChannel3.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })).expectError(cls).verify(Duration.ofSeconds(30L));
    }

    private static Stream<Arguments> writeFileDoesNotSwallowErrorSupplier() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Flux.just(ByteBuffer.allocate(1)), new MockAsynchronousFileChannel(new byte[4096]) { // from class: com.azure.core.util.FluxUtilTest.1
            @Override // com.azure.core.util.mocking.MockAsynchronousFileChannel, java.nio.channels.AsynchronousFileChannel
            public Future<Integer> write(ByteBuffer byteBuffer, long j) {
                return new CompletableFuture<Integer>() { // from class: com.azure.core.util.FluxUtilTest.1.1
                    @Override // java.util.concurrent.CompletableFuture, java.util.concurrent.Future
                    public Integer get() throws ExecutionException {
                        throw new ExecutionException(new NonWritableChannelException());
                    }
                };
            }

            @Override // com.azure.core.util.mocking.MockAsynchronousFileChannel, java.nio.channels.AsynchronousFileChannel
            public <A> void write(ByteBuffer byteBuffer, long j, A a, CompletionHandler<Integer, ? super A> completionHandler) {
                completionHandler.failed(new NonWritableChannelException(), a);
            }
        }, NonWritableChannelException.class}), Arguments.of(new Object[]{Flux.generate(() -> {
            return 0;
        }, (num, synchronousSink) -> {
            if (num.intValue() == 10) {
                synchronousSink.error(new IOException());
                return num;
            }
            synchronousSink.next(ByteBuffer.allocate(16));
            return Integer.valueOf(num.intValue() + 1);
        }), new MockAsynchronousFileChannel() { // from class: com.azure.core.util.FluxUtilTest.2
            @Override // com.azure.core.util.mocking.MockAsynchronousFileChannel, java.nio.channels.AsynchronousFileChannel
            public <A> void write(ByteBuffer byteBuffer, long j, A a, CompletionHandler<Integer, ? super A> completionHandler) {
                int remaining = byteBuffer.remaining();
                byteBuffer.position(byteBuffer.position() + remaining);
                completionHandler.completed(Integer.valueOf(remaining), a);
            }
        }, IOException.class}), Arguments.of(new Object[]{Flux.just(ByteBuffer.allocate(1)), new MockAsynchronousFileChannel(new byte[4096]) { // from class: com.azure.core.util.FluxUtilTest.3
            @Override // com.azure.core.util.mocking.MockAsynchronousFileChannel, java.nio.channels.AsynchronousFileChannel
            public Future<Integer> write(ByteBuffer byteBuffer, long j) {
                return new CompletableFuture<Integer>() { // from class: com.azure.core.util.FluxUtilTest.3.1
                    @Override // java.util.concurrent.CompletableFuture, java.util.concurrent.Future
                    public Integer get() throws ExecutionException {
                        throw new ExecutionException(new FileLockInterruptionException());
                    }
                };
            }

            @Override // com.azure.core.util.mocking.MockAsynchronousFileChannel, java.nio.channels.AsynchronousFileChannel
            public <A> void write(ByteBuffer byteBuffer, long j, A a, CompletionHandler<Integer, ? super A> completionHandler) {
                completionHandler.failed(new FileLockInterruptionException(), a);
            }
        }, FileLockInterruptionException.class})});
    }

    @Test
    public void writingRetriableStreamThatFails() throws IOException {
        byte[] bArr = new byte[1048576];
        CoreTestUtils.fillArray(bArr);
        AtomicInteger atomicInteger = new AtomicInteger();
        Flux createRetriableDownloadFlux = FluxUtil.createRetriableDownloadFlux(() -> {
            return generateStream(bArr, 0L, atomicInteger);
        }, (th, l) -> {
            return generateStream(bArr, l.longValue(), atomicInteger);
        }, new RetryOptions(new FixedDelayOptions(5, Duration.ofMillis(1L))), 0L);
        Path createTempFile = Files.createTempFile("writingRetriableStreamThatFails" + UUID.randomUUID(), ".txt", new FileAttribute[0]);
        createTempFile.toFile().deleteOnExit();
        StepVerifier.create(Flux.using(() -> {
            return AsynchronousFileChannel.open(createTempFile, StandardOpenOption.WRITE);
        }, asynchronousFileChannel -> {
            return FluxUtil.writeFile(createRetriableDownloadFlux, asynchronousFileChannel);
        }, asynchronousFileChannel2 -> {
            try {
                asynchronousFileChannel2.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })).expectComplete().verify(Duration.ofSeconds(60L));
        CoreTestUtils.assertArraysEqual(bArr, Files.readAllBytes(createTempFile));
    }

    private Flux<ByteBuffer> generateStream(byte[] bArr, long j, AtomicInteger atomicInteger) {
        long[] jArr = {j};
        return Flux.push(fluxSink -> {
            while (jArr[0] != bArr.length) {
                if (Math.random() < 0.05d && atomicInteger.getAndIncrement() < 5) {
                    fluxSink.error(new IOException());
                    return;
                } else {
                    int min = (int) Math.min(16384L, bArr.length - jArr[0]);
                    fluxSink.next(ByteBuffer.wrap(bArr, (int) jArr[0], min));
                    jArr[0] = jArr[0] + min;
                }
            }
            fluxSink.complete();
        });
    }

    @Test
    public void readFile() throws IOException {
        byte[] bArr = new byte[10485760];
        CoreTestUtils.fillArray(bArr);
        MockAsynchronousFileChannel mockAsynchronousFileChannel = new MockAsynchronousFileChannel(bArr, bArr.length);
        try {
            StepVerifier.create(FluxUtil.collectBytesInByteBufferStream(FluxUtil.readFile(mockAsynchronousFileChannel), bArr.length)).assertNext(bArr2 -> {
                CoreTestUtils.assertArraysEqual(bArr, bArr2);
            }).verifyComplete();
            if (mockAsynchronousFileChannel != null) {
                mockAsynchronousFileChannel.close();
            }
        } catch (Throwable th) {
            if (mockAsynchronousFileChannel != null) {
                try {
                    mockAsynchronousFileChannel.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @MethodSource({"toFluxByteBufferSupplier"})
    @ParameterizedTest
    public void toFluxByteBuffer(InputStream inputStream, Integer num, byte[] bArr) {
        Flux fluxByteBuffer = num == null ? FluxUtil.toFluxByteBuffer(inputStream) : FluxUtil.toFluxByteBuffer(inputStream, num.intValue());
        if (inputStream == null || bArr.length == 0) {
            StepVerifier.create(fluxByteBuffer).verifyComplete();
            return;
        }
        AtomicLong atomicLong = new AtomicLong((long) Math.ceil(bArr.length / (num == null ? 4096 : num.intValue())));
        ByteBuffer allocate = ByteBuffer.allocate(bArr.length);
        StepVerifier.create(FluxUtil.collectBytesInByteBufferStream(fluxByteBuffer)).thenRequest(atomicLong.get()).thenConsumeWhile(bArr2 -> {
            allocate.put(bArr2, allocate.position(), bArr2.length);
            if (atomicLong.decrementAndGet() != -1) {
                return true;
            }
            CoreTestUtils.assertArraysEqual(bArr, allocate.array());
            return false;
        }).verifyComplete();
    }

    private static Stream<Arguments> toFluxByteBufferSupplier() {
        byte[] bArr = new byte[0];
        byte[] bArr2 = new byte[4096];
        byte[] bArr3 = new byte[8193];
        CoreTestUtils.fillArray(bArr2);
        CoreTestUtils.fillArray(bArr3);
        return Stream.of((Object[]) new Arguments[]{Arguments.arguments(new Object[]{null, null, bArr}), Arguments.arguments(new Object[]{new ByteArrayInputStream(bArr), null, bArr}), Arguments.arguments(new Object[]{new ByteArrayInputStream(bArr2), null, bArr2}), Arguments.arguments(new Object[]{new ByteArrayInputStream(bArr3), null, bArr3}), Arguments.arguments(new Object[]{new ByteArrayInputStream(bArr2), 8192, bArr2}), Arguments.arguments(new Object[]{new ByteArrayInputStream(bArr2), 2048, bArr2}), Arguments.arguments(new Object[]{new ByteArrayInputStream(bArr3), 5432, bArr3})});
    }

    @Test
    public void toFluxByteBufferMultipleSubscriptions() {
        byte[] bArr = new byte[4096];
        CoreTestUtils.fillArray(bArr);
        Flux fluxByteBuffer = FluxUtil.toFluxByteBuffer(new ByteArrayInputStream(bArr));
        StepVerifier.create(FluxUtil.collectBytesInByteBufferStream(fluxByteBuffer)).assertNext(bArr2 -> {
            CoreTestUtils.assertArraysEqual(bArr, bArr2);
        }).verifyComplete();
        StepVerifier.create(FluxUtil.collectBytesInByteBufferStream(fluxByteBuffer)).assertNext(bArr3 -> {
            CoreTestUtils.assertArraysEqual(new byte[0], bArr3);
        }).verifyComplete();
    }

    @Test
    public void illegalToFluxByteBufferChunkSize() {
        StepVerifier.create(FluxUtil.toFluxByteBuffer((InputStream) null, 0)).verifyError(IllegalArgumentException.class);
        StepVerifier.create(FluxUtil.toFluxByteBuffer((InputStream) null, -1)).verifyError(IllegalArgumentException.class);
    }

    @Test
    public void toFluxByteBufferSinkException() {
        StepVerifier.create(FluxUtil.toFluxByteBuffer(new ByteArrayInputStream(new byte[0]) { // from class: com.azure.core.util.FluxUtilTest.4
            @Override // java.io.ByteArrayInputStream, java.io.InputStream
            public synchronized int read(byte[] bArr, int i, int i2) {
                throw new IllegalStateException("error");
            }
        })).verifyError(IllegalStateException.class);
    }

    @Test
    @DisabledOnOs({OS.WINDOWS})
    public void toFluxByteBufferFileInputStreamChannelCloses() throws IOException {
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        final AtomicInteger atomicInteger3 = new AtomicInteger();
        final MockFileChannel mockFileChannel = new MockFileChannel() { // from class: com.azure.core.util.FluxUtilTest.5
            @Override // com.azure.core.util.mocking.MockFileChannel, java.nio.channels.spi.AbstractInterruptibleChannel
            public void implCloseChannel() {
                atomicInteger3.incrementAndGet();
            }

            @Override // com.azure.core.util.mocking.MockFileChannel, java.nio.channels.FileChannel, java.nio.channels.SeekableByteChannel
            public long position() {
                atomicInteger.incrementAndGet();
                return 0L;
            }

            @Override // com.azure.core.util.mocking.MockFileChannel, java.nio.channels.FileChannel, java.nio.channels.SeekableByteChannel
            public long size() {
                atomicInteger2.incrementAndGet();
                return 0L;
            }
        };
        final AtomicInteger atomicInteger4 = new AtomicInteger();
        StepVerifier.create(FluxUtil.toFluxByteBuffer(new FileInputStream(new FileDescriptor()) { // from class: com.azure.core.util.FluxUtilTest.6
            @Override // java.io.FileInputStream
            public FileChannel getChannel() {
                atomicInteger4.incrementAndGet();
                return mockFileChannel;
            }
        })).verifyComplete();
        Assertions.assertEquals(1, atomicInteger4.get());
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(1, atomicInteger2.get());
        Assertions.assertEquals(1, atomicInteger3.get());
    }

    @RepeatedTest(10)
    public void ensureFileInputStreamFileCanBeDeletedAsConversionToFluxByteBuffer() throws IOException {
        Path createTempFile = Files.createTempFile("canBeDeleted" + CoreUtils.randomUuid(), "", new FileAttribute[0]);
        Files.write(createTempFile, "some random data".getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
        FileInputStream fileInputStream = new FileInputStream(createTempFile.toFile());
        StepVerifier.create(FluxUtil.toFluxByteBuffer(fileInputStream).then(Mono.create(monoSink -> {
            try {
                fileInputStream.close();
                Files.delete(createTempFile);
                monoSink.success();
            } catch (IOException e) {
                monoSink.error(e);
            }
        }))).verifyComplete();
        Assertions.assertTrue(Files.notExists(createTempFile, new LinkOption[0]));
    }

    public Flux<ByteBuffer> mockReturnType() {
        return Flux.just(ByteBuffer.wrap(new byte[0]));
    }

    private Mono<String> getSingle() {
        return FluxUtil.withContext(this::serviceCallSingle);
    }

    private Flux<String> getCollection() {
        return FluxUtil.fluxContext(this::serviceCallCollection);
    }

    private Mono<String> getSingleWithContextAttributes() {
        return FluxUtil.withContext(this::serviceCallWithContextMetadata, Collections.singletonMap("additionalContextKey", "additionalContextValue"));
    }

    private Mono<String> serviceCallSingle(Context context) {
        return Mono.just("Hello, " + context.getData("FirstName").orElse("Stranger") + " " + context.getData("LastName").orElse(""));
    }

    private Flux<String> serviceCallCollection(Context context) {
        return Flux.just(("Hello, " + context.getData("FirstName").orElse("Stranger") + " " + context.getData("LastName").orElse("")).split(" "));
    }

    private Mono<String> serviceCallWithContextMetadata(Context context) {
        return Mono.just("Hello, " + context.getData("FirstName").orElse("Stranger") + " " + context.getData("additionalContextKey").orElse("Not found"));
    }

    private File createFileIfNotExist() {
        String uuid = UUID.randomUUID().toString();
        File file = new File("target");
        if (!file.exists() && !file.getParentFile().mkdirs()) {
            throw new RuntimeException("Unable to create directories: " + file.getAbsolutePath());
        }
        try {
            return Files.createTempFile(file.toPath(), uuid, "", new FileAttribute[0]).toFile();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}
