package io.micronaut.http.netty.body;

import io.micronaut.buffer.netty.NettyByteBufferFactory;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.io.buffer.ByteBuffer;
import io.micronaut.http.netty.body.JsonCounter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.io.IOException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

@Internal
/* loaded from: input_file:io/micronaut/http/netty/body/JsonChunkedProcessor.class */
final class JsonChunkedProcessor {
    final JsonCounter counter = new JsonCounter();
    private ByteBuf singleBuffer;
    private CompositeByteBuf compositeBuffer;

    public Flux<ByteBuffer<?>> process(Flux<ByteBuf> flux) {
        return Flux.concat(new Publisher[]{flux.concatMap(byteBuf -> {
            return Flux.create(fluxSink -> {
                try {
                    try {
                        countLoop(fluxSink, byteBuf);
                        fluxSink.complete();
                        byteBuf.release();
                    } catch (IOException e) {
                        fluxSink.error(e);
                        byteBuf.release();
                    }
                } catch (Throwable th) {
                    byteBuf.release();
                    throw th;
                }
            });
        }), Flux.create(fluxSink -> {
            try {
                complete(fluxSink);
                fluxSink.complete();
            } catch (Throwable th) {
                fluxSink.error(th);
            }
        })}).doOnTerminate(this::releaseBuffers);
    }

    private void releaseBuffers() {
        if (this.singleBuffer != null) {
            this.singleBuffer.release();
            this.singleBuffer = null;
        }
        if (this.compositeBuffer != null) {
            this.compositeBuffer.release();
            this.compositeBuffer = null;
        }
    }

    private void countLoop(FluxSink<? super ByteBuffer<?>> fluxSink, ByteBuf byteBuf) throws IOException {
        long position = this.counter.position();
        long readerIndex = position - byteBuf.readerIndex();
        while (byteBuf.isReadable()) {
            this.counter.feed(byteBuf);
            JsonCounter.BufferRegion pollFlushedRegion = this.counter.pollFlushedRegion();
            if (pollFlushedRegion != null) {
                long max = Math.max(position, pollFlushedRegion.start());
                buffer(byteBuf.retainedSlice(Math.toIntExact(max - readerIndex), Math.toIntExact(pollFlushedRegion.end() - max)));
                flush(fluxSink);
            }
        }
        if (this.counter.isBuffering()) {
            byteBuf.readerIndex(Math.toIntExact(Math.max(position, this.counter.bufferStart()) - readerIndex));
            buffer(byteBuf.retain());
        }
    }

    private void buffer(ByteBuf byteBuf) {
        if (this.singleBuffer == null && this.compositeBuffer == null) {
            this.singleBuffer = byteBuf;
            return;
        }
        if (this.compositeBuffer == null) {
            this.compositeBuffer = byteBuf.alloc().compositeBuffer();
            this.compositeBuffer.addComponent(true, this.singleBuffer);
            this.singleBuffer = null;
        }
        this.compositeBuffer.addComponent(true, byteBuf);
    }

    private void flush(FluxSink<? super ByteBuffer<?>> fluxSink) {
        fluxSink.next(NettyByteBufferFactory.DEFAULT.wrap(this.compositeBuffer == null ? this.singleBuffer : this.compositeBuffer));
        this.compositeBuffer = null;
        this.singleBuffer = null;
    }

    private void complete(FluxSink<? super ByteBuffer<?>> fluxSink) {
        if (this.singleBuffer == null && this.compositeBuffer == null) {
            return;
        }
        flush(fluxSink);
    }
}
