package codes.rafael.springstreaminterop.webflux;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.web.reactive.function.BodyExtractor;
import reactor.core.publisher.Mono;

/* loaded from: input_file:codes/rafael/springstreaminterop/webflux/StreamBodyExtractors.class */
public class StreamBodyExtractors {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:codes/rafael/springstreaminterop/webflux/StreamBodyExtractors$FlowBufferInputStream.class */
    public static class FlowBufferInputStream extends InputStream implements Subscriber<DataBuffer> {
        private static final Object END = new Object();
        private final int maximumMemorySize;
        private final boolean failFast;

        @Nullable
        private Subscription subscription;
        private final AtomicBoolean closed = new AtomicBoolean();
        private final AtomicInteger buffered = new AtomicInteger();

        @Nullable
        private InputStreamWithSize current = new InputStreamWithSize(0, InputStream.nullInputStream());
        private final BlockingQueue<Object> backlog = new LinkedBlockingDeque();

        FlowBufferInputStream(int i, boolean z) {
            this.maximumMemorySize = i;
            this.failFast = z;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            if (this.closed.get()) {
                subscription.cancel();
            } else {
                subscription.request(1L);
            }
        }

        public void onNext(DataBuffer dataBuffer) {
            if (this.closed.get()) {
                DataBufferUtils.release(dataBuffer);
                return;
            }
            int readableByteCount = dataBuffer.readableByteCount();
            if (this.buffered.addAndGet(readableByteCount) < this.maximumMemorySize) {
                this.subscription.request(1L);
            }
            this.backlog.add(new InputStreamWithSize(readableByteCount, dataBuffer.asInputStream(true)));
            if (this.closed.get()) {
                DataBufferUtils.release(dataBuffer);
            }
        }

        public void onError(Throwable th) {
            if (this.failFast) {
                while (true) {
                    Object poll = this.backlog.poll();
                    if (poll == null) {
                        break;
                    }
                    if (poll instanceof InputStreamWithSize) {
                        try {
                            ((InputStreamWithSize) poll).inputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                }
            }
            this.backlog.add(th);
        }

        public void onComplete() {
            this.backlog.add(END);
        }

        private boolean forward() throws IOException {
            this.current.inputStream.close();
            try {
                Object take = this.backlog.take();
                if (take == END) {
                    this.current = null;
                    return true;
                }
                if (take instanceof RuntimeException) {
                    close();
                    throw ((RuntimeException) take);
                }
                if (take instanceof IOException) {
                    close();
                    throw ((IOException) take);
                }
                if (take instanceof Throwable) {
                    close();
                    throw new IllegalStateException((Throwable) take);
                }
                if (this.buffered.addAndGet(-this.current.size) < this.maximumMemorySize) {
                    this.subscription.request(1L);
                }
                this.current = (InputStreamWithSize) take;
                return false;
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }

        @Override // java.io.InputStream
        public int read() throws IOException {
            if (this.closed.get()) {
                throw new IOException("closed");
            }
            if (this.current == null) {
                return -1;
            }
            do {
                int read = this.current.inputStream.read();
                if (read != -1) {
                    return read;
                }
            } while (!forward());
            return -1;
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            Objects.checkFromIndexSize(i, i2, bArr.length);
            if (this.closed.get()) {
                throw new IOException("closed");
            }
            if (this.current == null) {
                return -1;
            }
            int i3 = 0;
            do {
                int read = this.current.inputStream.read(bArr, i + i3, i2 - i3);
                if (read != -1) {
                    i3 += read;
                } else {
                    if (this.backlog.isEmpty()) {
                        return i3;
                    }
                    if (forward()) {
                        if (i3 == 0) {
                            return -1;
                        }
                        return i3;
                    }
                }
            } while (i3 < i2);
            return i3;
        }

        @Override // java.io.InputStream
        public int available() throws IOException {
            if (this.closed.get()) {
                throw new IOException("closed");
            }
            if (this.current == null) {
                return 0;
            }
            int available = this.current.inputStream.available();
            for (Object obj : this.backlog) {
                if (!(obj instanceof InputStreamWithSize)) {
                    break;
                }
                available += ((InputStreamWithSize) obj).inputStream.available();
            }
            return available;
        }

        @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.closed.compareAndSet(false, true)) {
                if (this.subscription != null) {
                    this.subscription.cancel();
                }
                IOException iOException = null;
                if (this.current != null) {
                    try {
                        this.current.inputStream.close();
                    } catch (IOException e) {
                        iOException = e;
                    }
                }
                for (Object obj : this.backlog) {
                    if (obj instanceof InputStreamWithSize) {
                        try {
                            ((InputStreamWithSize) obj).inputStream.close();
                        } catch (IOException e2) {
                            if (iOException == null) {
                                iOException = e2;
                            } else {
                                iOException.addSuppressed(e2);
                            }
                        }
                    }
                }
                if (iOException != null) {
                    throw iOException;
                }
            }
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:codes/rafael/springstreaminterop/webflux/StreamBodyExtractors$InputStreamMapper.class */
    public interface InputStreamMapper<T> {
        T apply(InputStream inputStream) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:codes/rafael/springstreaminterop/webflux/StreamBodyExtractors$InputStreamWithSize.class */
    public static class InputStreamWithSize {
        final int size;
        final InputStream inputStream;

        InputStreamWithSize(int i, InputStream inputStream) {
            this.size = i;
            this.inputStream = inputStream;
        }
    }

    public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(InputStreamMapper<T> inputStreamMapper) {
        return toMono(inputStreamMapper, true);
    }

    public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(InputStreamMapper<T> inputStreamMapper, boolean z) {
        return toMono(inputStreamMapper, z, 262144);
    }

    public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(InputStreamMapper<T> inputStreamMapper, boolean z, int i) {
        Assert.notNull(inputStreamMapper, "'streamMapper' must not be null");
        Assert.isTrue(i > 0, "'maximumMemorySize' must be positive");
        return (reactiveHttpInputMessage, context) -> {
            try {
                FlowBufferInputStream flowBufferInputStream = new FlowBufferInputStream(i, z);
                try {
                    reactiveHttpInputMessage.getBody().subscribe(flowBufferInputStream);
                    Mono just = Mono.just(inputStreamMapper.apply(flowBufferInputStream));
                    flowBufferInputStream.close();
                    return just;
                } finally {
                }
            } catch (Throwable th) {
                return Mono.error(th);
            }
        };
    }

    public static BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> toMono(Supplier<? extends OutputStream> supplier) {
        Assert.notNull(supplier, "'streamSupplier' must not be null");
        return (reactiveHttpInputMessage, context) -> {
            try {
                OutputStream outputStream = (OutputStream) supplier.get();
                try {
                    DataBufferUtils.write(reactiveHttpInputMessage.getBody(), outputStream).blockLast();
                    Mono empty = Mono.empty();
                    if (outputStream != null) {
                        outputStream.close();
                    }
                    return empty;
                } finally {
                }
            } catch (Throwable th) {
                return Mono.error(th);
            }
        };
    }
}
