package reactor.ipc.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.ByteBufMono;

/* loaded from: input_file:BOOT-INF/lib/reactor-netty-0.7.8.RELEASE.jar:reactor/ipc/netty/ByteBufFlux.class */
public final class ByteBufFlux extends FluxOperator<ByteBuf, ByteBuf> {
    final ByteBufAllocator alloc;
    static final Function<Object, ByteBuf> bytebufExtractor = obj -> {
        if (obj instanceof ByteBuf) {
            return (ByteBuf) obj;
        }
        if (obj instanceof ByteBufHolder) {
            return ((ByteBufHolder) obj).content();
        }
        throw new IllegalArgumentException("Object " + obj + " of type " + obj.getClass() + " cannot be converted to ByteBuf");
    };
    static final int MAX_CHUNK_SIZE = 524288;

    public static ByteBufFlux fromInbound(Publisher<?> publisher) {
        return fromInbound(publisher, ByteBufAllocator.DEFAULT);
    }

    public static ByteBufFlux fromInbound(Publisher<?> publisher, ByteBufAllocator byteBufAllocator) {
        Objects.requireNonNull(byteBufAllocator, "allocator");
        return new ByteBufFlux(Flux.from(publisher).map(bytebufExtractor), byteBufAllocator);
    }

    public static ByteBufFlux fromPath(Path path) {
        return fromPath(path, MAX_CHUNK_SIZE);
    }

    public static ByteBufFlux fromPath(Path path, int i) {
        return fromPath(path, i, ByteBufAllocator.DEFAULT);
    }

    public static ByteBufFlux fromPath(Path path, ByteBufAllocator byteBufAllocator) {
        return fromPath(path, MAX_CHUNK_SIZE, byteBufAllocator);
    }

    public static ByteBufFlux fromPath(Path path, int i, ByteBufAllocator byteBufAllocator) {
        Objects.requireNonNull(path, "path");
        Objects.requireNonNull(byteBufAllocator, "allocator");
        if (i < 1) {
            throw new IllegalArgumentException("chunk size must be strictly positive, was: " + i);
        }
        return new ByteBufFlux(Flux.generate(() -> {
            return FileChannel.open(path, new OpenOption[0]);
        }, (fileChannel, synchronousSink) -> {
            ByteBuf buffer = byteBufAllocator.buffer();
            try {
                if (buffer.writeBytes(fileChannel, i) < 0) {
                    buffer.release();
                    synchronousSink.complete();
                } else {
                    synchronousSink.next(buffer);
                }
            } catch (IOException e) {
                buffer.release();
                synchronousSink.error(e);
            }
            return fileChannel;
        }), byteBufAllocator);
    }

    public final Flux<ByteBuffer> asByteBuffer() {
        return map((v0) -> {
            return v0.nioBuffer();
        });
    }

    public final Flux<byte[]> asByteArray() {
        return map(byteBuf -> {
            byte[] bArr = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bArr);
            return bArr;
        });
    }

    public Flux<InputStream> asInputStream() {
        return map(ByteBufMono.ReleasingInputStream::new);
    }

    public final Flux<String> asString() {
        return asString(Charset.defaultCharset());
    }

    public final Flux<String> asString(Charset charset) {
        return map(byteBuf -> {
            return byteBuf.toString(charset);
        });
    }

    public ByteBufMono aggregate() {
        ByteBufAllocator byteBufAllocator = this.alloc;
        Objects.requireNonNull(byteBufAllocator);
        return (ByteBufMono) Mono.using(byteBufAllocator::compositeBuffer, compositeByteBuf -> {
            return reduce(compositeByteBuf, (compositeByteBuf, byteBuf) -> {
                return compositeByteBuf.addComponent(byteBuf.retain());
            }).doOnNext(compositeByteBuf2 -> {
                compositeByteBuf2.writerIndex(compositeByteBuf2.capacity());
            }).filter((v0) -> {
                return v0.isReadable();
            });
        }, (v0) -> {
            v0.release();
        }).as(ByteBufMono::new);
    }

    public ByteBufMono multicast() {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public ByteBufFlux retain() {
        return new ByteBufFlux(doOnNext((v0) -> {
            v0.retain();
        }), this.alloc);
    }

    ByteBufFlux(Flux<ByteBuf> flux, ByteBufAllocator byteBufAllocator) {
        super(flux);
        this.alloc = byteBufAllocator;
    }

    @Override // reactor.core.publisher.Flux
    public void subscribe(CoreSubscriber<? super ByteBuf> coreSubscriber) {
        this.source.subscribe((CoreSubscriber<? super Object>) coreSubscriber);
    }
}
