package com.azure.core.implementation.util;

import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;

/* loaded from: input_file:com/azure/core/implementation/util/FluxUtil.class */
public final class FluxUtil {
    private static final int DEFAULT_CHUNK_SIZE = 65536;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/implementation/util/FluxUtil$ByteBufStreamFromFile.class */
    public static final class ByteBufStreamFromFile extends Flux<ByteBuf> {
        private final ByteBufAllocator alloc = ByteBufAllocator.DEFAULT;
        private final AsynchronousFileChannel fileChannel;
        private final int chunkSize;
        private final long offset;
        private final long length;

        /* loaded from: input_file:com/azure/core/implementation/util/FluxUtil$ByteBufStreamFromFile$FileReadSubscription.class */
        static final class FileReadSubscription implements Subscription, CompletionHandler<Integer, ByteBuf> {
            private static final int NOT_SET = -1;
            private static final long serialVersionUID = -6831808726875304256L;
            private final Subscriber<? super ByteBuf> subscriber;
            private volatile long position = -1;
            private final AsynchronousFileChannel fileChannel;
            private final ByteBufAllocator alloc;
            private final int chunkSize;
            private final long offset;
            private final long length;
            private volatile boolean done;
            private Throwable error;
            private volatile ByteBuf next;
            private volatile boolean cancelled;
            volatile int wip;
            volatile long requested;
            static final AtomicIntegerFieldUpdater<FileReadSubscription> WIP = AtomicIntegerFieldUpdater.newUpdater(FileReadSubscription.class, "wip");
            static final AtomicLongFieldUpdater<FileReadSubscription> REQUESTED = AtomicLongFieldUpdater.newUpdater(FileReadSubscription.class, "requested");

            FileReadSubscription(Subscriber<? super ByteBuf> subscriber, AsynchronousFileChannel asynchronousFileChannel, ByteBufAllocator byteBufAllocator, int i, long j, long j2) {
                this.subscriber = subscriber;
                this.fileChannel = asynchronousFileChannel;
                this.alloc = byteBufAllocator;
                this.chunkSize = i;
                this.offset = j;
                this.length = j2;
            }

            public void request(long j) {
                if (Operators.validate(j)) {
                    Operators.addCap(REQUESTED, this, j);
                    drain();
                }
            }

            public void cancel() {
                this.cancelled = true;
            }

            @Override // java.nio.channels.CompletionHandler
            public void completed(Integer num, ByteBuf byteBuf) {
                if (this.cancelled) {
                    return;
                }
                if (num.intValue() == NOT_SET) {
                    this.done = true;
                } else {
                    long j = this.position;
                    int min = Math.min(num.intValue(), maxRequired(j));
                    byteBuf.writerIndex(min);
                    long j2 = j + min;
                    this.position = j2;
                    this.next = byteBuf;
                    if (j2 >= this.offset + this.length) {
                        this.done = true;
                    }
                }
                drain();
            }

            @Override // java.nio.channels.CompletionHandler
            public void failed(Throwable th, ByteBuf byteBuf) {
                if (this.cancelled) {
                    return;
                }
                this.error = th;
                this.done = true;
                drain();
            }

            private void drain() {
                boolean z;
                if (WIP.getAndIncrement(this) != 0) {
                    return;
                }
                if (this.position == -1) {
                    this.position = this.offset;
                    doRead();
                }
                int i = 1;
                while (!this.cancelled) {
                    if (REQUESTED.get(this) > 0) {
                        boolean z2 = this.done;
                        ByteBuf byteBuf = this.next;
                        if (byteBuf != null) {
                            this.next = null;
                            this.subscriber.onNext(byteBuf);
                            z = true;
                        } else {
                            z = false;
                        }
                        if (z2) {
                            if (this.error != null) {
                                this.subscriber.onError(this.error);
                                return;
                            } else {
                                this.subscriber.onComplete();
                                return;
                            }
                        }
                        if (z) {
                            Operators.produced(REQUESTED, this, 1L);
                            doRead();
                        }
                    }
                    i = WIP.addAndGet(this, -i);
                    if (i == 0) {
                        return;
                    }
                }
            }

            private void doRead() {
                long j = this.position;
                int min = Math.min(this.chunkSize, maxRequired(j));
                ByteBuf buffer = this.alloc.buffer(min, min);
                this.fileChannel.read(buffer.nioBuffer(0, min), j, buffer, this);
            }

            private int maxRequired(long j) {
                long j2 = (this.offset + this.length) - j;
                if (j2 <= 0) {
                    return 0;
                }
                int i = (int) j2;
                if (i < 0) {
                    return Integer.MAX_VALUE;
                }
                return i;
            }
        }

        ByteBufStreamFromFile(AsynchronousFileChannel asynchronousFileChannel, int i, long j, long j2) {
            this.fileChannel = asynchronousFileChannel;
            this.chunkSize = i;
            this.offset = j;
            this.length = j2;
        }

        public void subscribe(CoreSubscriber<? super ByteBuf> coreSubscriber) {
            coreSubscriber.onSubscribe(new FileReadSubscription(coreSubscriber, this.fileChannel, this.alloc, this.chunkSize, this.offset, this.length));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/implementation/util/FluxUtil$ByteBufToFileSubscriber.class */
    public static class ByteBufToFileSubscriber implements Subscriber<ByteBuf> {
        volatile boolean isWriting;
        volatile boolean isCompleted;
        volatile Subscription subscription;
        volatile long pos;
        AsynchronousFileChannel outFile;
        MonoSink<Void> emitter;
        CompletionHandler<Integer, Object> onWriteCompleted;

        private ByteBufToFileSubscriber(AsynchronousFileChannel asynchronousFileChannel, long j, MonoSink<Void> monoSink) {
            this.isWriting = false;
            this.isCompleted = false;
            this.onWriteCompleted = new CompletionHandler<Integer, Object>() { // from class: com.azure.core.implementation.util.FluxUtil.ByteBufToFileSubscriber.1
                @Override // java.nio.channels.CompletionHandler
                public void completed(Integer num, Object obj) {
                    ByteBufToFileSubscriber.this.isWriting = false;
                    if (ByteBufToFileSubscriber.this.isCompleted) {
                        ByteBufToFileSubscriber.this.emitter.success();
                    }
                    ByteBufToFileSubscriber.this.pos += num.intValue();
                    if (ByteBufToFileSubscriber.this.subscription != null) {
                        ByteBufToFileSubscriber.this.subscription.request(1L);
                    }
                }

                @Override // java.nio.channels.CompletionHandler
                public void failed(Throwable th, Object obj) {
                    if (ByteBufToFileSubscriber.this.subscription != null) {
                        ByteBufToFileSubscriber.this.subscription.cancel();
                    }
                    ByteBufToFileSubscriber.this.emitter.error(th);
                }
            };
            this.outFile = asynchronousFileChannel;
            this.pos = j;
            this.emitter = monoSink;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1L);
        }

        public void onNext(ByteBuf byteBuf) {
            this.isWriting = true;
            this.outFile.write(byteBuf.nioBuffer(), this.pos, null, this.onWriteCompleted);
        }

        public void onError(Throwable th) {
            if (this.subscription != null) {
                this.subscription.cancel();
            }
            this.emitter.error(th);
        }

        public void onComplete() {
            this.isCompleted = true;
            if (this.isWriting) {
                return;
            }
            this.emitter.success();
        }
    }

    public static boolean isFluxByteBuf(Type type) {
        return TypeUtil.isTypeOrSubTypeOf(type, Flux.class) && TypeUtil.isTypeOrSubTypeOf(TypeUtil.getTypeArguments(type)[0], ByteBuf.class);
    }

    public static Mono<byte[]> collectBytesInByteBufStream(Flux<ByteBuf> flux, boolean z) {
        return z ? Mono.using(Unpooled::compositeBuffer, compositeByteBuf -> {
            return flux.collect(() -> {
                return compositeByteBuf;
            }, (compositeByteBuf, byteBuf) -> {
                compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(byteBuf).retain());
            });
        }, (v0) -> {
            ReferenceCountUtil.release(v0);
        }).filter(compositeByteBuf2 -> {
            return compositeByteBuf2.isReadable();
        }).map((v0) -> {
            return byteBufToArray(v0);
        }) : flux.collect(Unpooled::compositeBuffer, (compositeByteBuf3, byteBuf) -> {
            compositeByteBuf3.addComponent(true, Unpooled.wrappedBuffer(byteBuf));
        }).filter(compositeByteBuf4 -> {
            return compositeByteBuf4.isReadable();
        }).map((v0) -> {
            return byteBufToArray(v0);
        });
    }

    public static Flux<ByteBuf> split(ByteBuf byteBuf, int i) {
        byteBuf.getClass();
        return Flux.generate(byteBuf::readerIndex, (num, synchronousSink) -> {
            int writerIndex = byteBuf.writerIndex();
            if (num.intValue() >= writerIndex) {
                synchronousSink.complete();
                return Integer.valueOf(writerIndex);
            }
            int min = Math.min(writerIndex - num.intValue(), i);
            synchronousSink.next(byteBuf.slice(num.intValue(), min).retain());
            return Integer.valueOf(num.intValue() + min);
        });
    }

    public static byte[] byteBufToArray(ByteBuf byteBuf) {
        byte[] bArr = new byte[byteBuf.readableBytes()];
        byteBuf.getBytes(byteBuf.readerIndex(), bArr);
        return bArr;
    }

    public static Mono<ByteBuf> collectByteBufStream(Flux<ByteBuf> flux, boolean z) {
        return z ? Mono.using(() -> {
            return Unpooled.compositeBuffer();
        }, compositeByteBuf -> {
            return flux.reduce(compositeByteBuf, (compositeByteBuf, byteBuf) -> {
                return compositeByteBuf.addComponent(byteBuf.retain());
            }).doOnNext(compositeByteBuf2 -> {
                compositeByteBuf2.writerIndex(compositeByteBuf2.capacity());
            }).filter(compositeByteBuf3 -> {
                return compositeByteBuf3.isReadable();
            });
        }, compositeByteBuf2 -> {
            compositeByteBuf2.release();
        }) : flux.collect(Unpooled::compositeBuffer, (compositeByteBuf3, byteBuf) -> {
            compositeByteBuf3.addComponent(true, Unpooled.wrappedBuffer(byteBuf));
        }).filter(compositeByteBuf4 -> {
            return compositeByteBuf4.isReadable();
        }).map(compositeByteBuf5 -> {
            return compositeByteBuf5;
        });
    }

    public static Mono<Void> bytebufStreamToFile(Flux<ByteBuf> flux, AsynchronousFileChannel asynchronousFileChannel) {
        return bytebufStreamToFile(flux, asynchronousFileChannel, 0L);
    }

    public static Mono<Void> bytebufStreamToFile(Flux<ByteBuf> flux, AsynchronousFileChannel asynchronousFileChannel, long j) {
        return Mono.create(monoSink -> {
            flux.subscribe(new ByteBufToFileSubscriber(asynchronousFileChannel, j, monoSink));
        });
    }

    public static <T> Mono<T> withContext(Function<Context, Mono<T>> function) {
        return Mono.subscriberContext().map(FluxUtil::toAzureContext).flatMap(function);
    }

    public static <T> Mono<T> toMono(Response<T> response) {
        return Mono.justOrEmpty(response.value());
    }

    public static <T> Flux<T> fluxContext(Function<Context, Flux<T>> function) {
        return Mono.subscriberContext().map(FluxUtil::toAzureContext).flatMapMany(function);
    }

    private static Context toAzureContext(reactor.util.context.Context context) {
        Map map = (Map) context.stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        return ImplUtils.isNullOrEmpty((Map<?, ?>) map) ? Context.NONE : Context.of(map);
    }

    public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel asynchronousFileChannel, int i, long j, long j2) {
        return new ByteBufStreamFromFile(asynchronousFileChannel, i, j, j2);
    }

    public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel asynchronousFileChannel, long j, long j2) {
        return byteBufStreamFromFile(asynchronousFileChannel, DEFAULT_CHUNK_SIZE, j, j2);
    }

    public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel asynchronousFileChannel) {
        try {
            return byteBufStreamFromFile(asynchronousFileChannel, DEFAULT_CHUNK_SIZE, 0L, asynchronousFileChannel.size());
        } catch (IOException e) {
            return Flux.error(e);
        }
    }

    private FluxUtil() {
    }
}
