package io.yupiik.fusion.http.server.impl.flow;

import jakarta.servlet.ReadListener;
import jakarta.servlet.ServletInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:io/yupiik/fusion/http/server/impl/flow/ServletInputStreamSubscription.class */
public class ServletInputStreamSubscription implements Flow.Subscription, ReadListener {
    private static final Logger LOGGER = Logger.getLogger(ServletInputStreamSubscription.class.getName());
    private static final int DEFAULT_BUFFER_SIZE = 8192;
    private final ServletInputStream inputStream;
    private final ReadableByteChannel channel;
    private final Flow.Subscriber<? super ByteBuffer> downstream;
    private volatile boolean cancelled = false;
    private volatile long requested = 0;

    public ServletInputStreamSubscription(ServletInputStream servletInputStream, Flow.Subscriber<? super ByteBuffer> subscriber) {
        this.inputStream = servletInputStream;
        this.channel = Channels.newChannel((InputStream) servletInputStream);
        this.downstream = subscriber;
        servletInputStream.setReadListener(this);
    }

    private void readIfPossible() {
        ByteBuffer allocate;
        int read;
        if (this.cancelled || this.requested == 0) {
            return;
        }
        try {
            for (long j = this.requested; j > 0; j--) {
                if (this.inputStream.isReady() && (read = this.channel.read((allocate = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE)))) > 0) {
                    allocate.position(0).limit(read);
                    this.downstream.onNext(allocate);
                }
            }
        } catch (IOException | RuntimeException e) {
            onError(e);
        }
    }

    public synchronized void onDataAvailable() {
        readIfPossible();
    }

    public synchronized void onAllDataRead() {
        if (this.cancelled) {
            return;
        }
        readIfPossible();
        if (this.cancelled) {
            return;
        }
        this.downstream.onComplete();
        doClose();
    }

    @Override // java.util.concurrent.Flow.Subscription
    public synchronized void request(long j) {
        if (j <= 0) {
            throw new IllegalArgumentException("Invalid request: " + j + ", should be > 0");
        }
        if (this.cancelled) {
            return;
        }
        this.requested += j;
        readIfPossible();
    }

    public synchronized void onError(Throwable th) {
        Logger logger = LOGGER;
        Level level = Level.SEVERE;
        Objects.requireNonNull(th);
        logger.log(level, th, th::getMessage);
        if (this.cancelled) {
            return;
        }
        doClose();
        this.downstream.onError(th);
        this.cancelled = true;
    }

    private void doClose() {
        if (this.channel.isOpen()) {
            try {
                this.channel.close();
            } catch (IOException e) {
                Logger logger = LOGGER;
                Level level = Level.SEVERE;
                Objects.requireNonNull(e);
                logger.log(level, e, e::getMessage);
            }
        }
    }

    @Override // java.util.concurrent.Flow.Subscription
    public synchronized void cancel() {
        this.cancelled = true;
    }
}
