package codes.rafael.springstreaminterop.webflux;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.util.Assert;
import org.springframework.web.reactive.function.BodyInserter;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:codes/rafael/springstreaminterop/webflux/StreamBodyInserters.class */
public class StreamBodyInserters {

    @FunctionalInterface
    /* loaded from: input_file:codes/rafael/springstreaminterop/webflux/StreamBodyInserters$FromOutputStream.class */
    public interface FromOutputStream {
        void accept(OutputStream outputStream) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:codes/rafael/springstreaminterop/webflux/StreamBodyInserters$WriterOutputStream.class */
    public static class WriterOutputStream extends OutputStream {
        private final DataBufferFactory factory;
        private final Sinks.Many<DataBuffer> sink;
        private final AtomicBoolean closed = new AtomicBoolean();

        private WriterOutputStream(DataBufferFactory dataBufferFactory, Sinks.Many<DataBuffer> many) {
            this.factory = dataBufferFactory;
            this.sink = many;
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            if (this.closed.get()) {
                throw new IOException("closed");
            }
            DataBuffer allocateBuffer = this.factory.allocateBuffer(1);
            allocateBuffer.write((byte) (i & 255));
            this.sink.tryEmitNext(allocateBuffer).orThrow();
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            if (this.closed.get()) {
                throw new IOException("closed");
            }
            DataBuffer allocateBuffer = this.factory.allocateBuffer(i2);
            allocateBuffer.write(bArr, i, i2);
            this.sink.tryEmitNext(allocateBuffer).orThrow();
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.closed.compareAndSet(false, true)) {
                this.sink.tryEmitComplete().orThrow();
            }
        }
    }

    public static BodyInserter<Void, ReactiveHttpOutputMessage> fromOutputStream(FromOutputStream fromOutputStream) {
        Assert.notNull(fromOutputStream, "'publisher' must not be null");
        return (reactiveHttpOutputMessage, context) -> {
            Sinks.Many onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
            Mono writeWith = reactiveHttpOutputMessage.writeWith(onBackpressureBuffer.asFlux());
            WriterOutputStream writerOutputStream = new WriterOutputStream(reactiveHttpOutputMessage.bufferFactory(), onBackpressureBuffer);
            try {
                fromOutputStream.accept(writerOutputStream);
            } catch (Throwable th) {
                onBackpressureBuffer.emitError(th, Sinks.EmitFailureHandler.FAIL_FAST);
            }
            writerOutputStream.close();
            return writeWith;
        };
    }

    public static BodyInserter<Void, ReactiveHttpOutputMessage> fromInputStream(Supplier<? extends InputStream> supplier) {
        return fromInputStream(supplier, 8192);
    }

    public static BodyInserter<Void, ReactiveHttpOutputMessage> fromInputStream(Supplier<? extends InputStream> supplier, int i) {
        Assert.notNull(supplier, "'streamSupplier' must not be null");
        Assert.state(i > 0, "'chunkSize' must be a positive number");
        return (reactiveHttpOutputMessage, context) -> {
            Sinks.Many onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
            DataBufferFactory bufferFactory = reactiveHttpOutputMessage.bufferFactory();
            Mono writeWith = reactiveHttpOutputMessage.writeWith(onBackpressureBuffer.asFlux());
            try {
                InputStream inputStream = (InputStream) supplier.get();
                if (inputStream == null) {
                    onBackpressureBuffer.emitError(new NullPointerException("inputStream"), Sinks.EmitFailureHandler.FAIL_FAST);
                } else {
                    try {
                        byte[] bArr = new byte[i];
                        while (true) {
                            int read = inputStream.read(bArr);
                            if (read == -1) {
                                break;
                            }
                            if (read != 0) {
                                byte[] bArr2 = new byte[read];
                                System.arraycopy(bArr, 0, bArr2, 0, read);
                                onBackpressureBuffer.emitNext(bufferFactory.wrap(bArr2), Sinks.EmitFailureHandler.FAIL_FAST);
                            }
                        }
                        onBackpressureBuffer.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                        if (inputStream != null) {
                            inputStream.close();
                        }
                    } finally {
                    }
                }
            } catch (Throwable th) {
                onBackpressureBuffer.emitError(th, Sinks.EmitFailureHandler.FAIL_FAST);
            }
            return writeWith;
        };
    }
}
