package org.infinispan.rest.stream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.rxjava3.subscribers.DefaultSubscriber;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

/* loaded from: input_file:org/infinispan/rest/stream/CacheChunkedStream.class */
public abstract class CacheChunkedStream<T> {
    protected static final Log logger = LogFactory.getLog(CacheChunkedStream.class);
    private static final int CHUNK_SIZE = 8192;
    protected final Publisher<T> publisher;

    /* loaded from: input_file:org/infinispan/rest/stream/CacheChunkedStream$ByteBufSubscriber.class */
    static abstract class ByteBufSubscriber<T> extends DefaultSubscriber<T> {
        protected final ChannelHandlerContext ctx;
        protected final ByteBufAllocator allocator;
        protected final GenericFutureListener<Future<Void>> ERROR_LISTENER = future -> {
            try {
                future.get();
            } catch (Throwable th) {
                onError(th);
            }
        };
        private boolean firstEntry = true;
        private ByteBuf pendingBuffer;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: protected */
        public ByteBufSubscriber(ChannelHandlerContext channelHandlerContext, ByteBufAllocator byteBufAllocator) {
            this.ctx = (ChannelHandlerContext) Objects.requireNonNull(channelHandlerContext);
            this.allocator = (ByteBufAllocator) Objects.requireNonNull(byteBufAllocator);
        }

        protected ByteBuf newByteBuf() {
            return this.allocator.buffer(2048);
        }

        protected void onStart() {
            ByteBuf newByteBuf = newByteBuf();
            newByteBuf.writeByte(91);
            this.pendingBuffer = newByteBuf;
            request(1L);
        }

        public void onNext(T t) {
            ByteBuf byteBuf = this.pendingBuffer;
            if (this.firstEntry) {
                this.firstEntry = false;
            } else {
                byteBuf.writeByte(44);
            }
            writeItem(t, byteBuf);
            if (byteBuf.writerIndex() > CacheChunkedStream.CHUNK_SIZE) {
                writeToContext(byteBuf, false).addListener(future -> {
                    try {
                        future.get();
                        request(1L);
                    } catch (Throwable th) {
                        onError(th);
                    }
                });
                this.pendingBuffer = newByteBuf();
            } else {
                if (!$assertionsDisabled && byteBuf.writableBytes() <= 0) {
                    throw new AssertionError();
                }
                request(1L);
            }
        }

        abstract void writeItem(T t, ByteBuf byteBuf);

        public void onError(Throwable th) {
            CacheChunkedStream.logger.error("Error encountered while streaming cache chunk", th);
            if (this.pendingBuffer != null) {
                this.pendingBuffer.release();
                this.pendingBuffer = null;
            }
            cancel();
            this.ctx.close();
        }

        public void onComplete() {
            ByteBuf byteBuf = this.pendingBuffer;
            byteBuf.writeByte(93);
            writeToContext(byteBuf, true).addListener(this.ERROR_LISTENER);
            this.pendingBuffer = null;
        }

        ChannelFuture writeToContext(ByteBuf byteBuf, boolean z) {
            ChannelFuture write = this.ctx.write(new DefaultHttpContent(byteBuf));
            if (z) {
                write = this.ctx.write(LastHttpContent.EMPTY_LAST_CONTENT);
            }
            this.ctx.flush();
            return write;
        }

        static {
            $assertionsDisabled = !CacheChunkedStream.class.desiredAssertionStatus();
        }
    }

    public CacheChunkedStream(Publisher<T> publisher) {
        this.publisher = publisher;
    }

    public static byte[] readContentAsBytes(Object obj) {
        return obj instanceof byte[] ? (byte[]) obj : obj instanceof WrappedByteArray ? ((WrappedByteArray) obj).getBytes() : obj.toString().getBytes(StandardCharsets.UTF_8);
    }

    public abstract void subscribe(ChannelHandlerContext channelHandlerContext);
}
