package io.servicetalk.encoding.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.internal.PlatformDependent;
import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.buffer.api.CompositeBuffer;
import io.servicetalk.buffer.api.ReadOnlyBufferAllocators;
import io.servicetalk.buffer.netty.BufferUtils;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.encoding.api.CodecDecodingException;
import io.servicetalk.encoding.api.CodecEncodingException;
import io.servicetalk.encoding.api.ContentCodec;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/servicetalk/encoding/netty/NettyChannelContentCodec.class */
public final class NettyChannelContentCodec extends AbstractContentCodec {
    private static final Logger LOGGER;
    private static final Buffer END_OF_STREAM;
    private static final int MAX_SIZE_FOR_MERGED_BUFFER = 1024;
    private final Supplier<MessageToByteEncoder<ByteBuf>> encoderSupplier;
    private final Supplier<ByteToMessageDecoder> decoderSupplier;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyChannelContentCodec(CharSequence charSequence, Supplier<MessageToByteEncoder<ByteBuf>> supplier, Supplier<ByteToMessageDecoder> supplier2) {
        super(charSequence);
        this.encoderSupplier = supplier;
        this.decoderSupplier = supplier2;
    }

    public Buffer encode(Buffer buffer, BufferAllocator bufferAllocator) {
        Objects.requireNonNull(bufferAllocator);
        if (buffer.readableBytes() == 0) {
            throw new CodecEncodingException(this, "No data to encode.");
        }
        EmbeddedChannel newEmbeddedChannel = newEmbeddedChannel(this.encoderSupplier.get(), bufferAllocator);
        try {
            try {
                try {
                    newEmbeddedChannel.writeOutbound(new Object[]{BufferUtils.extractByteBufOrCreate(buffer)});
                    preparePendingData(newEmbeddedChannel);
                    Buffer drainChannelQueueToSingleBuffer = drainChannelQueueToSingleBuffer(newEmbeddedChannel.outboundMessages(), bufferAllocator);
                    if (drainChannelQueueToSingleBuffer == null) {
                        throw new CodecEncodingException(this, "Not enough data to produce an encoded output");
                    }
                    cleanup(newEmbeddedChannel);
                    safeCleanup(newEmbeddedChannel);
                    return drainChannelQueueToSingleBuffer;
                } catch (CodecEncodingException e) {
                    throw e;
                }
            } catch (Throwable th) {
                throw wrapEncodingException(this, th);
            }
        } catch (Throwable th2) {
            safeCleanup(newEmbeddedChannel);
            throw th2;
        }
    }

    public Buffer encode(Buffer buffer, int i, int i2, BufferAllocator bufferAllocator) {
        Buffer slice = buffer.slice(buffer.readerIndex() + i, i2);
        buffer.skipBytes(i + i2);
        return encode(slice, bufferAllocator);
    }

    public Publisher<Buffer> encode(Publisher<Buffer> publisher, BufferAllocator bufferAllocator) {
        Objects.requireNonNull(publisher);
        Objects.requireNonNull(bufferAllocator);
        return publisher.concat(Single.succeeded(END_OF_STREAM)).liftSync(subscriber -> {
            return new PublisherSource.Subscriber<Buffer>() { // from class: io.servicetalk.encoding.netty.NettyChannelContentCodec.1
                private final EmbeddedChannel channel;

                @Nullable
                PublisherSource.Subscription subscription;
                private final MessageToByteEncoder encoder;
                static final /* synthetic */ boolean $assertionsDisabled;

                {
                    this.encoder = (MessageToByteEncoder) NettyChannelContentCodec.this.encoderSupplier.get();
                    this.channel = NettyChannelContentCodec.this.newEmbeddedChannel(this.encoder, bufferAllocator);
                }

                public void onSubscribe(PublisherSource.Subscription subscription) {
                    this.subscription = subscription;
                    subscriber.onSubscribe(subscription);
                }

                public void onNext(Buffer buffer) {
                    if (!$assertionsDisabled && this.subscription == null) {
                        throw new AssertionError();
                    }
                    if (!this.channel.isOpen()) {
                        throw new IllegalStateException("Stream encoder previously closed but more input arrived");
                    }
                    if (buffer == null) {
                        throw new CodecEncodingException(NettyChannelContentCodec.this, "Cannot encode null values");
                    }
                    try {
                        if (buffer == NettyChannelContentCodec.END_OF_STREAM) {
                            NettyChannelContentCodec.preparePendingData(this.channel);
                            Buffer drainChannelQueueToSingleBuffer = NettyChannelContentCodec.drainChannelQueueToSingleBuffer(this.channel.outboundMessages(), bufferAllocator);
                            if (drainChannelQueueToSingleBuffer != null) {
                                subscriber.onNext(drainChannelQueueToSingleBuffer);
                                return;
                            }
                            return;
                        }
                        this.channel.writeOutbound(new Object[]{BufferUtils.extractByteBufOrCreate(buffer)});
                        Buffer drainChannelQueueToSingleBuffer2 = NettyChannelContentCodec.drainChannelQueueToSingleBuffer(this.channel.outboundMessages(), bufferAllocator);
                        if (drainChannelQueueToSingleBuffer2 == null || drainChannelQueueToSingleBuffer2.readableBytes() <= 0) {
                            this.subscription.request(1L);
                        } else {
                            subscriber.onNext(drainChannelQueueToSingleBuffer2);
                        }
                    } catch (Throwable th) {
                        throw NettyChannelContentCodec.wrapEncodingException(NettyChannelContentCodec.this, th);
                    }
                }

                public void onError(Throwable th) {
                    NettyChannelContentCodec.safeCleanup(this.channel);
                    subscriber.onError(th);
                }

                public void onComplete() {
                    try {
                        NettyChannelContentCodec.cleanup(this.channel);
                        subscriber.onComplete();
                    } catch (Throwable th) {
                        subscriber.onError(NettyChannelContentCodec.wrapEncodingException(NettyChannelContentCodec.this, th));
                    }
                }

                static {
                    $assertionsDisabled = !NettyChannelContentCodec.class.desiredAssertionStatus();
                }
            };
        });
    }

    public Buffer decode(Buffer buffer, BufferAllocator bufferAllocator) {
        Objects.requireNonNull(bufferAllocator);
        if (buffer.readableBytes() == 0) {
            throw new CodecEncodingException(this, "No data to encode.");
        }
        EmbeddedChannel newEmbeddedChannel = newEmbeddedChannel(this.decoderSupplier.get(), bufferAllocator);
        try {
            try {
                newEmbeddedChannel.writeInbound(new Object[]{BufferUtils.extractByteBufOrCreate(buffer)});
                Buffer drainChannelQueueToSingleBuffer = drainChannelQueueToSingleBuffer(newEmbeddedChannel.inboundMessages(), bufferAllocator);
                if (drainChannelQueueToSingleBuffer == null) {
                    throw new CodecDecodingException(this, "Not enough data to decode.");
                }
                cleanup(newEmbeddedChannel);
                safeCleanup(newEmbeddedChannel);
                return drainChannelQueueToSingleBuffer;
            } catch (CodecDecodingException e) {
                throw e;
            } catch (Throwable th) {
                throw wrapDecodingException(this, th);
            }
        } catch (Throwable th2) {
            safeCleanup(newEmbeddedChannel);
            throw th2;
        }
    }

    public Buffer decode(Buffer buffer, int i, int i2, BufferAllocator bufferAllocator) {
        Buffer slice = buffer.slice(buffer.readerIndex() + i, i2);
        buffer.skipBytes(i + i2);
        return decode(slice, bufferAllocator);
    }

    public Publisher<Buffer> decode(Publisher<Buffer> publisher, BufferAllocator bufferAllocator) {
        Objects.requireNonNull(publisher);
        Objects.requireNonNull(bufferAllocator);
        return publisher.liftSync(subscriber -> {
            return new PublisherSource.Subscriber<Buffer>() { // from class: io.servicetalk.encoding.netty.NettyChannelContentCodec.2
                private final ByteToMessageDecoder decoder;
                private final EmbeddedChannel channel;

                @Nullable
                PublisherSource.Subscription subscription;
                static final /* synthetic */ boolean $assertionsDisabled;

                {
                    this.decoder = (ByteToMessageDecoder) NettyChannelContentCodec.this.decoderSupplier.get();
                    this.channel = NettyChannelContentCodec.this.newEmbeddedChannel(this.decoder, bufferAllocator);
                }

                public void onSubscribe(PublisherSource.Subscription subscription) {
                    this.subscription = subscription;
                    subscriber.onSubscribe(subscription);
                }

                public void onNext(@Nullable Buffer buffer) {
                    if (!$assertionsDisabled && this.subscription == null) {
                        throw new AssertionError();
                    }
                    if (!this.channel.isOpen()) {
                        throw new CodecDecodingException(NettyChannelContentCodec.this, "Stream decoder previously closed but more input arrived");
                    }
                    if (buffer == null) {
                        throw new CodecDecodingException(NettyChannelContentCodec.this, "Cannot decode null values");
                    }
                    try {
                        this.channel.writeInbound(new Object[]{BufferUtils.extractByteBufOrCreate(buffer)});
                        Buffer drainChannelQueueToSingleBuffer = NettyChannelContentCodec.drainChannelQueueToSingleBuffer(this.channel.inboundMessages(), bufferAllocator);
                        if (drainChannelQueueToSingleBuffer == null || drainChannelQueueToSingleBuffer.readableBytes() <= 0) {
                            this.subscription.request(1L);
                        } else {
                            subscriber.onNext(drainChannelQueueToSingleBuffer);
                        }
                    } catch (Throwable th) {
                        throw NettyChannelContentCodec.wrapDecodingException(NettyChannelContentCodec.this, th);
                    }
                }

                public void onError(Throwable th) {
                    NettyChannelContentCodec.safeCleanup(this.channel);
                    subscriber.onError(th);
                }

                public void onComplete() {
                    try {
                        NettyChannelContentCodec.cleanup(this.channel);
                        subscriber.onComplete();
                    } catch (Throwable th) {
                        subscriber.onError(NettyChannelContentCodec.wrapDecodingException(NettyChannelContentCodec.this, th));
                    }
                }

                static {
                    $assertionsDisabled = !NettyChannelContentCodec.class.desiredAssertionStatus();
                }
            };
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public EmbeddedChannel newEmbeddedChannel(ChannelHandler channelHandler, BufferAllocator bufferAllocator) {
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{channelHandler});
        embeddedChannel.config().setAllocator(BufferUtils.getByteBufAllocator(bufferAllocator));
        return embeddedChannel;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void preparePendingData(EmbeddedChannel embeddedChannel) {
        try {
            embeddedChannel.close().sync().get();
            embeddedChannel.checkException();
        } catch (InterruptedException | ExecutionException e) {
            PlatformDependent.throwException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Nullable
    public static Buffer drainChannelQueueToSingleBuffer(Queue<Object> queue, BufferAllocator bufferAllocator) {
        if (queue.isEmpty()) {
            return null;
        }
        if (queue.size() == 1) {
            return BufferUtils.newBufferFrom((ByteBuf) queue.poll());
        }
        int i = 0;
        int i2 = 0;
        Iterator<Object> it = queue.iterator();
        while (it.hasNext()) {
            i += ((ByteBuf) it.next()).readableBytes();
            i2++;
        }
        if (i <= MAX_SIZE_FOR_MERGED_BUFFER) {
            Buffer newBuffer = bufferAllocator.newBuffer();
            while (true) {
                ByteBuf byteBuf = (ByteBuf) queue.poll();
                if (byteBuf == null) {
                    return newBuffer;
                }
                newBuffer.writeBytes(BufferUtils.newBufferFrom(byteBuf));
            }
        } else {
            CompositeBuffer newCompositeBuffer = bufferAllocator.newCompositeBuffer(i2);
            while (true) {
                ByteBuf byteBuf2 = (ByteBuf) queue.poll();
                if (byteBuf2 == null) {
                    return newCompositeBuffer;
                }
                newCompositeBuffer.addBuffer(BufferUtils.newBufferFrom(byteBuf2));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void cleanup(EmbeddedChannel embeddedChannel) {
        boolean finishAndReleaseAll = embeddedChannel.finishAndReleaseAll();
        if (!$assertionsDisabled && finishAndReleaseAll) {
            throw new AssertionError();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void safeCleanup(EmbeddedChannel embeddedChannel) {
        try {
            cleanup(embeddedChannel);
        } catch (AssertionError e) {
            throw e;
        } catch (Throwable th) {
            LOGGER.error("Error while closing embedded channel", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static CodecEncodingException wrapEncodingException(ContentCodec contentCodec, Throwable th) {
        return new CodecEncodingException(contentCodec, "Unexpected exception during encoding", th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static CodecDecodingException wrapDecodingException(ContentCodec contentCodec, Throwable th) {
        return new CodecDecodingException(contentCodec, "Unexpected exception during decoding", th);
    }

    static {
        $assertionsDisabled = !NettyChannelContentCodec.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(NettyChannelContentCodec.class);
        END_OF_STREAM = ReadOnlyBufferAllocators.DEFAULT_RO_ALLOCATOR.fromAscii(" ");
    }
}
