package com.azure.core.implementation;

import com.azure.core.util.io.IOUtils;
import com.azure.core.util.mocking.MockAsynchronousFileChannel;
import com.azure.core.util.mocking.MockMonoSink;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.test.StepVerifier;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

/* loaded from: input_file:com/azure/core/implementation/AsynchronousByteChannelWriteSubscriberTests.class */
public class AsynchronousByteChannelWriteSubscriberTests {

    /* loaded from: input_file:com/azure/core/implementation/AsynchronousByteChannelWriteSubscriberTests$CallTrackingSubscription.class */
    private static final class CallTrackingSubscription implements Subscription {
        private final AtomicInteger requestOneCalls;
        private final AtomicInteger cancelCalls;

        private CallTrackingSubscription() {
            this.requestOneCalls = new AtomicInteger();
            this.cancelCalls = new AtomicInteger();
        }

        public void request(long j) {
            if (j == 1) {
                this.requestOneCalls.incrementAndGet();
            }
        }

        public void cancel() {
            this.cancelCalls.incrementAndGet();
        }
    }

    /* loaded from: input_file:com/azure/core/implementation/AsynchronousByteChannelWriteSubscriberTests$WriteCountTrackingChannel.class */
    private static class WriteCountTrackingChannel extends MockAsynchronousFileChannel {
        private int writeCount;

        WriteCountTrackingChannel(byte[] bArr) {
            super(bArr);
            this.writeCount = 0;
        }

        @Override // com.azure.core.util.mocking.MockAsynchronousFileChannel, java.nio.channels.AsynchronousFileChannel
        public <A> void write(ByteBuffer byteBuffer, long j, A a, CompletionHandler<Integer, ? super A> completionHandler) {
            this.writeCount++;
            super.write(byteBuffer, j, a, completionHandler);
        }

        @Override // com.azure.core.util.mocking.MockAsynchronousFileChannel, java.nio.channels.AsynchronousFileChannel
        public Future<Integer> write(ByteBuffer byteBuffer, long j) {
            this.writeCount++;
            return super.write(byteBuffer, j);
        }
    }

    @Test
    public void multipleSubscriptionsCancelsLaterSubscriptions() {
        AsynchronousByteChannelWriteSubscriber asynchronousByteChannelWriteSubscriber = new AsynchronousByteChannelWriteSubscriber(IOUtils.toAsynchronousByteChannel(new MockAsynchronousFileChannel() { // from class: com.azure.core.implementation.AsynchronousByteChannelWriteSubscriberTests.1
            @Override // com.azure.core.util.mocking.MockAsynchronousFileChannel, java.nio.channels.AsynchronousFileChannel
            public <A> void write(ByteBuffer byteBuffer, long j, A a, CompletionHandler<Integer, ? super A> completionHandler) {
                int remaining = byteBuffer.remaining();
                byteBuffer.position(byteBuffer.position() + remaining);
                completionHandler.completed(Integer.valueOf(remaining), a);
            }
        }, 0L), (MonoSink) null);
        CallTrackingSubscription callTrackingSubscription = new CallTrackingSubscription();
        CallTrackingSubscription callTrackingSubscription2 = new CallTrackingSubscription();
        asynchronousByteChannelWriteSubscriber.onSubscribe(callTrackingSubscription);
        asynchronousByteChannelWriteSubscriber.onSubscribe(callTrackingSubscription2);
        Assertions.assertEquals(1, callTrackingSubscription.requestOneCalls.get());
        Assertions.assertEquals(0, callTrackingSubscription.cancelCalls.get());
        Assertions.assertEquals(0, callTrackingSubscription2.requestOneCalls.get());
        Assertions.assertEquals(1, callTrackingSubscription2.cancelCalls.get());
    }

    @Test
    public void emptyBuffersNeverWrite() {
        Flux create = Flux.create(fluxSink -> {
            fluxSink.next(ByteBuffer.allocate(0));
            fluxSink.complete();
        });
        WriteCountTrackingChannel writeCountTrackingChannel = new WriteCountTrackingChannel(new byte[4096]);
        StepVerifier.create(Mono.create(monoSink -> {
            create.subscribe(new AsynchronousByteChannelWriteSubscriber(IOUtils.toAsynchronousByteChannel(writeCountTrackingChannel, 0L), monoSink));
        })).verifyComplete();
        Assertions.assertEquals(0, writeCountTrackingChannel.writeCount);
    }

    @Test
    public void allOnNextEmissionsAreHandledBeforeOnComplete() {
        WriteCountTrackingChannel writeCountTrackingChannel = new WriteCountTrackingChannel(new byte[131072]);
        byte[] bArr = new byte[8192];
        ThreadLocalRandom.current().nextBytes(bArr);
        Flux create = Flux.create(fluxSink -> {
            IntStream.range(0, 16).forEach(i -> {
                fluxSink.next(ByteBuffer.wrap(bArr));
            });
            fluxSink.complete();
        });
        StepVerifier.create(Mono.create(monoSink -> {
            create.subscribe(new AsynchronousByteChannelWriteSubscriber(IOUtils.toAsynchronousByteChannel(writeCountTrackingChannel, 0L), monoSink));
        })).verifyComplete();
        Assertions.assertEquals(16, writeCountTrackingChannel.writeCount);
    }

    @Test
    public void allOnNextEmissionsAreHandledBeforeOnError() {
        WriteCountTrackingChannel writeCountTrackingChannel = new WriteCountTrackingChannel(new byte[131072]);
        byte[] bArr = new byte[8192];
        ThreadLocalRandom.current().nextBytes(bArr);
        Flux create = Flux.create(fluxSink -> {
            IntStream.range(0, 16).forEach(i -> {
                fluxSink.next(ByteBuffer.wrap(bArr));
            });
            fluxSink.error(new IOException());
        });
        StepVerifier.create(Mono.create(monoSink -> {
            create.subscribe(new AsynchronousByteChannelWriteSubscriber(IOUtils.toAsynchronousByteChannel(writeCountTrackingChannel, 0L), monoSink));
        })).verifyError(IOException.class);
        Assertions.assertEquals(16, writeCountTrackingChannel.writeCount);
    }

    @MethodSource({"writtenDataValidationSupplier"})
    @ParameterizedTest
    public void writtenDataValidation(Flux<ByteBuffer> flux, byte[] bArr) {
        byte[] bArr2 = new byte[bArr.length];
        WriteCountTrackingChannel writeCountTrackingChannel = new WriteCountTrackingChannel(bArr2);
        StepVerifier.create(Mono.create(monoSink -> {
            flux.subscribe(new AsynchronousByteChannelWriteSubscriber(IOUtils.toAsynchronousByteChannel(writeCountTrackingChannel, 0L), monoSink));
        })).verifyComplete();
        Assertions.assertArrayEquals(bArr, bArr2);
    }

    private static Stream<Arguments> writtenDataValidationSupplier() {
        byte[] bArr = new byte[4196];
        ThreadLocalRandom.current().nextBytes(bArr);
        byte[] bArr2 = new byte[16392];
        ThreadLocalRandom.current().nextBytes(bArr2);
        byte[] bArr3 = new byte[131136];
        ThreadLocalRandom.current().nextBytes(bArr3);
        byte[] bArr4 = new byte[bArr.length + bArr2.length + bArr3.length];
        System.arraycopy(bArr, 0, bArr4, 0, bArr.length);
        System.arraycopy(bArr2, 0, bArr4, bArr.length, bArr2.length);
        System.arraycopy(bArr3, 0, bArr4, bArr.length + bArr2.length, bArr3.length);
        byte[] bArr5 = new byte[bArr.length + bArr2.length + bArr3.length];
        System.arraycopy(bArr3, 0, bArr5, 0, bArr3.length);
        System.arraycopy(bArr2, 0, bArr5, bArr3.length, bArr2.length);
        System.arraycopy(bArr, 0, bArr5, bArr3.length + bArr2.length, bArr.length);
        byte[] bArr6 = new byte[bArr.length + bArr2.length + bArr3.length];
        System.arraycopy(bArr, 0, bArr6, 0, bArr.length);
        System.arraycopy(bArr3, 0, bArr6, bArr.length, bArr3.length);
        System.arraycopy(bArr2, 0, bArr6, bArr.length + bArr3.length, bArr2.length);
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Flux.create(fluxSink -> {
            fluxSink.next(ByteBuffer.wrap(bArr));
            fluxSink.next(ByteBuffer.wrap(bArr2));
            fluxSink.next(ByteBuffer.wrap(bArr3));
            fluxSink.complete();
        }), bArr4}), Arguments.of(new Object[]{Flux.create(fluxSink2 -> {
            fluxSink2.next(ByteBuffer.wrap(bArr3));
            fluxSink2.next(ByteBuffer.wrap(bArr2));
            fluxSink2.next(ByteBuffer.wrap(bArr));
            fluxSink2.complete();
        }), bArr5}), Arguments.of(new Object[]{Flux.create(fluxSink3 -> {
            fluxSink3.next(ByteBuffer.wrap(bArr));
            fluxSink3.next(ByteBuffer.wrap(bArr3));
            fluxSink3.next(ByteBuffer.wrap(bArr2));
            fluxSink3.complete();
        }), bArr6})});
    }

    @Test
    public void onErrorAfterErrorIsDropped() {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        final Context of = Context.of("reactor.onErrorDropped.local", th -> {
            atomicInteger.incrementAndGet();
            atomicReference.set(th);
        });
        MockMonoSink<Void> mockMonoSink = new MockMonoSink<Void>() { // from class: com.azure.core.implementation.AsynchronousByteChannelWriteSubscriberTests.2
            public ContextView contextView() {
                return of;
            }
        };
        CallTrackingSubscription callTrackingSubscription = new CallTrackingSubscription();
        AsynchronousByteChannelWriteSubscriber asynchronousByteChannelWriteSubscriber = new AsynchronousByteChannelWriteSubscriber((AsynchronousByteChannel) null, mockMonoSink);
        asynchronousByteChannelWriteSubscriber.onSubscribe(callTrackingSubscription);
        asynchronousByteChannelWriteSubscriber.onError(new IOException());
        asynchronousByteChannelWriteSubscriber.onError(new IllegalStateException());
        Assertions.assertEquals(1, callTrackingSubscription.cancelCalls.get());
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertInstanceOf(IllegalStateException.class, atomicReference.get());
    }

    @Test
    public void onCompleteAfterOnCompleteIsIgnored() {
        final AtomicInteger atomicInteger = new AtomicInteger();
        AsynchronousByteChannelWriteSubscriber asynchronousByteChannelWriteSubscriber = new AsynchronousByteChannelWriteSubscriber((AsynchronousByteChannel) null, new MockMonoSink<Void>() { // from class: com.azure.core.implementation.AsynchronousByteChannelWriteSubscriberTests.3
            @Override // com.azure.core.util.mocking.MockMonoSink
            public void success() {
                atomicInteger.incrementAndGet();
            }
        });
        asynchronousByteChannelWriteSubscriber.onComplete();
        asynchronousByteChannelWriteSubscriber.onComplete();
        Assertions.assertEquals(1, atomicInteger.get());
    }

    @Test
    public void onNextAfterOnCompleteIsDropped() {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        Consumer consumer = byteBuffer -> {
            atomicInteger.incrementAndGet();
            atomicReference.set(byteBuffer);
        };
        ByteBuffer wrap = ByteBuffer.wrap(new byte[4096]);
        Flux contextWrite = Flux.create(fluxSink -> {
            fluxSink.complete();
            fluxSink.next(wrap);
        }).contextWrite(Context.of("reactor.onNextDropped.local", consumer));
        StepVerifier.create(Mono.create(monoSink -> {
            contextWrite.subscribe(new AsynchronousByteChannelWriteSubscriber((AsynchronousByteChannel) null, monoSink));
        })).verifyComplete();
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertSame(wrap, atomicReference.get());
    }

    @Test
    public void onNextAfterOnErrorIsDropped() {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        Consumer consumer = byteBuffer -> {
            atomicInteger.incrementAndGet();
            atomicReference.set(byteBuffer);
        };
        ByteBuffer wrap = ByteBuffer.wrap(new byte[4096]);
        Flux contextWrite = Flux.create(fluxSink -> {
            fluxSink.error(new IOException());
            fluxSink.next(wrap);
        }).contextWrite(Context.of("reactor.onNextDropped.local", consumer));
        StepVerifier.create(Mono.create(monoSink -> {
            contextWrite.subscribe(new AsynchronousByteChannelWriteSubscriber((AsynchronousByteChannel) null, monoSink));
        })).verifyError(IOException.class);
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertSame(wrap, atomicReference.get());
    }

    @Test
    public void errorWritingDuringOnCompleteResultsInOnError() {
        WriteCountTrackingChannel writeCountTrackingChannel = new WriteCountTrackingChannel(new byte[4096]) { // from class: com.azure.core.implementation.AsynchronousByteChannelWriteSubscriberTests.4
            @Override // com.azure.core.implementation.AsynchronousByteChannelWriteSubscriberTests.WriteCountTrackingChannel, com.azure.core.util.mocking.MockAsynchronousFileChannel, java.nio.channels.AsynchronousFileChannel
            public <A> void write(ByteBuffer byteBuffer, long j, A a, CompletionHandler<Integer, ? super A> completionHandler) {
                completionHandler.failed(new IOException(), a);
            }

            @Override // com.azure.core.implementation.AsynchronousByteChannelWriteSubscriberTests.WriteCountTrackingChannel, com.azure.core.util.mocking.MockAsynchronousFileChannel, java.nio.channels.AsynchronousFileChannel
            public Future<Integer> write(ByteBuffer byteBuffer, long j) {
                return new CompletableFuture<Integer>() { // from class: com.azure.core.implementation.AsynchronousByteChannelWriteSubscriberTests.4.1
                    @Override // java.util.concurrent.CompletableFuture, java.util.concurrent.Future
                    public Integer get() throws InterruptedException, ExecutionException {
                        throw new ExecutionException(new IOException());
                    }
                };
            }
        };
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        AsynchronousByteChannelWriteSubscriber asynchronousByteChannelWriteSubscriber = new AsynchronousByteChannelWriteSubscriber(IOUtils.toAsynchronousByteChannel(writeCountTrackingChannel, 0L), new MockMonoSink<Void>() { // from class: com.azure.core.implementation.AsynchronousByteChannelWriteSubscriberTests.5
            @Override // com.azure.core.util.mocking.MockMonoSink
            public void success() {
                atomicInteger.incrementAndGet();
            }

            @Override // com.azure.core.util.mocking.MockMonoSink
            public void error(Throwable th) {
                atomicInteger2.incrementAndGet();
            }
        });
        asynchronousByteChannelWriteSubscriber.onSubscribe(new CallTrackingSubscription());
        asynchronousByteChannelWriteSubscriber.onNext(ByteBuffer.wrap(new byte[4096]));
        asynchronousByteChannelWriteSubscriber.onComplete();
        Assertions.assertEquals(1, atomicInteger2.get());
        Assertions.assertEquals(0, atomicInteger.get());
    }
}
