package io.datakernel.stream.processor;

import com.google.common.base.Preconditions;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufPool;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.serializer.BufferSerializer;
import io.datakernel.serializer.SerializationOutputBuffer;
import io.datakernel.stream.AbstractStreamTransformer_1_1_Stateless;
import io.datakernel.stream.StreamDataReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/stream/processor/StreamBinarySerializer.class */
public final class StreamBinarySerializer<T> extends AbstractStreamTransformer_1_1_Stateless<T, ByteBuf> implements StreamSerializer<T>, StreamDataReceiver<T>, StreamBinarySerializerMBean {
    private static final Logger logger;
    private static final ArrayIndexOutOfBoundsException OUT_OF_BOUNDS_EXCEPTION;
    public static final int MAX_SIZE_1_BYTE = 127;
    public static final int MAX_SIZE_2_BYTE = 16383;
    public static final int MAX_SIZE_3_BYTE = 2097151;
    public static final int MAX_SIZE = 2097151;
    private static final int MAX_HEADER_BYTES = 3;
    private final BufferSerializer<T> serializer;
    private final int defaultBufferSize;
    private final int maxMessageSize;
    private final int headerSize;
    private ByteBuf byteBuf;
    private final SerializationOutputBuffer outputBuffer;
    private int estimatedMessageSize;
    private final int flushDelayMillis;
    private boolean flushPosted;
    private final boolean skipSerializationErrors;
    private int serializationErrors;
    private int jmxItems;
    private long jmxBytes;
    private int jmxBufs;
    static final /* synthetic */ boolean $assertionsDisabled;

    public StreamBinarySerializer(Eventloop eventloop, BufferSerializer<T> bufferSerializer, int i, int i2, int i3, boolean z) {
        super(eventloop);
        this.outputBuffer = new SerializationOutputBuffer();
        Preconditions.checkArgument(i2 > 0 && i2 <= 2097151, "maxMessageSize must be in [4B..2MB) range, got %s", new Object[]{Integer.valueOf(i2)});
        Preconditions.checkArgument(i > 0, "defaultBufferSize must be positive value, got %s", new Object[]{Integer.valueOf(i)});
        this.skipSerializationErrors = z;
        this.serializer = (BufferSerializer) Preconditions.checkNotNull(bufferSerializer);
        this.maxMessageSize = i2;
        this.headerSize = varint32Size(i2);
        this.estimatedMessageSize = 1;
        this.defaultBufferSize = i;
        this.flushDelayMillis = i3;
        allocateBuffer();
    }

    public static int varint32Size(int i) {
        if ((i & (-128)) == 0) {
            return 1;
        }
        if ((i & (-16384)) == 0) {
            return 2;
        }
        if ((i & (-2097152)) == 0) {
            return 3;
        }
        return (i & (-268435456)) == 0 ? 4 : 5;
    }

    private void allocateBuffer() {
        this.byteBuf = ByteBufPool.allocate(Math.max(this.defaultBufferSize, this.headerSize + this.estimatedMessageSize));
        this.outputBuffer.set(this.byteBuf.array(), 0);
    }

    private void flushBuffer(StreamDataReceiver<ByteBuf> streamDataReceiver) {
        this.byteBuf.position(0);
        int position = this.outputBuffer.position();
        if (position != 0) {
            this.byteBuf.limit(position);
            this.jmxBytes += position;
            this.jmxBufs++;
            if (this.status <= 1) {
                streamDataReceiver.onData(this.byteBuf);
            }
        } else {
            this.byteBuf.recycle();
        }
        allocateBuffer();
    }

    private void ensureSize(int i) {
        if (this.outputBuffer.remaining() < i) {
            flushBuffer(this.downstreamDataReceiver);
        }
    }

    private void writeSize(byte[] bArr, int i, int i2) {
        if (this.headerSize == 1) {
            bArr[i] = (byte) i2;
            return;
        }
        bArr[i] = (byte) ((i2 & MAX_SIZE_1_BYTE) | 128);
        int i3 = i2 >>> 7;
        if (this.headerSize == 2) {
            bArr[i + 1] = (byte) i3;
        } else {
            if (!$assertionsDisabled && this.headerSize != 3) {
                throw new AssertionError();
            }
            bArr[i + 1] = (byte) ((i3 & MAX_SIZE_1_BYTE) | 128);
            bArr[i + 2] = (byte) (i3 >>> 7);
        }
    }

    @Override // io.datakernel.stream.StreamDataReceiver
    public void onData(T t) {
        if (!$assertionsDisabled) {
            int i = this.jmxItems;
            int i2 = this.jmxItems + 1;
            this.jmxItems = i2;
            if (i == i2) {
                throw new AssertionError();
            }
        }
        while (true) {
            ensureSize(this.headerSize + this.estimatedMessageSize);
            int position = this.outputBuffer.position();
            int i3 = position + this.headerSize;
            try {
                this.outputBuffer.position(i3);
                this.serializer.serialize(this.outputBuffer, t);
                int position2 = this.outputBuffer.position() - i3;
                if (!$assertionsDisabled && position2 == 0) {
                    throw new AssertionError();
                }
                if (position2 > this.maxMessageSize) {
                    onSerializationError(OUT_OF_BOUNDS_EXCEPTION);
                    return;
                }
                writeSize(this.outputBuffer.array(), position, position2);
                int i4 = position2 + (position2 >>> 2);
                if (i4 > this.estimatedMessageSize) {
                    this.estimatedMessageSize = i4;
                } else {
                    this.estimatedMessageSize -= this.estimatedMessageSize >>> 8;
                }
                if (this.flushPosted) {
                    return;
                }
                postFlush();
                return;
            } catch (ArrayIndexOutOfBoundsException e) {
                this.outputBuffer.position(position);
                int length = this.outputBuffer.array().length - i3;
                if (length >= this.maxMessageSize) {
                    onSerializationError(e);
                    return;
                }
                this.estimatedMessageSize = length + 1 + (length >>> 1);
            } catch (Exception e2) {
                onSerializationError(e2);
                return;
            }
        }
    }

    private void onSerializationError(Exception exc) {
        this.serializationErrors++;
        if (this.skipSerializationErrors) {
            logger.warn("Skipping serialization error in {} : {}", this, exc.toString());
        } else {
            closeWithError(exc);
        }
    }

    @Override // io.datakernel.stream.StreamConsumer
    public StreamDataReceiver<T> getDataReceiver() {
        return this;
    }

    @Override // io.datakernel.stream.AbstractStreamTransformer_1_1_Stateless, io.datakernel.stream.StreamConsumer
    public void onEndOfStream() {
        flushBuffer(this.downstreamDataReceiver);
        this.byteBuf.recycle();
        this.byteBuf = null;
        this.outputBuffer.set((byte[]) null, 0);
        logger.trace("endOfStream {}, upstream: {}", this, this.upstreamProducer);
        sendEndOfStream();
    }

    @Override // io.datakernel.stream.processor.StreamSerializer
    public void flush() {
        flushBuffer(this.downstreamDataReceiver);
        this.flushPosted = false;
    }

    private void postFlush() {
        this.flushPosted = true;
        if (this.flushDelayMillis == 0) {
            this.eventloop.postLater(new Runnable() { // from class: io.datakernel.stream.processor.StreamBinarySerializer.1
                @Override // java.lang.Runnable
                public void run() {
                    if (StreamBinarySerializer.this.status < 2) {
                        StreamBinarySerializer.this.flush();
                    }
                }
            });
        } else {
            this.eventloop.scheduleBackground(this.eventloop.currentTimeMillis() + this.flushDelayMillis, new Runnable() { // from class: io.datakernel.stream.processor.StreamBinarySerializer.2
                @Override // java.lang.Runnable
                public void run() {
                    if (StreamBinarySerializer.this.status < 2) {
                        StreamBinarySerializer.this.flush();
                    }
                }
            });
        }
    }

    @Override // io.datakernel.stream.AbstractStreamTransformer_1_1, io.datakernel.stream.AbstractStreamProducer
    public void onClosed() {
        super.onClosed();
        recycleBufs();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.datakernel.stream.AbstractStreamTransformer_1_1, io.datakernel.stream.AbstractStreamProducer
    public void onClosedWithError(Exception exc) {
        super.onClosedWithError(exc);
        recycleBufs();
    }

    private void recycleBufs() {
        if (this.byteBuf != null) {
            this.byteBuf.recycle();
            this.byteBuf = null;
        }
    }

    @Override // io.datakernel.stream.processor.StreamBinarySerializerMBean
    public int getItems() {
        return this.jmxItems;
    }

    @Override // io.datakernel.stream.processor.StreamBinarySerializerMBean
    public int getBufs() {
        return this.jmxBufs;
    }

    @Override // io.datakernel.stream.processor.StreamBinarySerializerMBean
    public long getBytes() {
        return this.jmxBytes;
    }

    @Override // io.datakernel.stream.processor.StreamBinarySerializerMBean
    public int getSerializationErrors() {
        return this.serializationErrors;
    }

    @Override // io.datakernel.stream.AbstractStreamTransformer_1_1, io.datakernel.stream.AbstractStreamProducer
    public String toString() {
        boolean z = false;
        if (!$assertionsDisabled) {
            z = true;
            if (1 == 0) {
                throw new AssertionError();
            }
        }
        return '{' + super.toString() + (this.serializationErrors != 0 ? "serializationErrors:" + this.serializationErrors : "") + " items:" + (z ? "" + this.jmxItems : "?") + " bufs:" + this.jmxBufs + " bytes:" + this.jmxBytes + '}';
    }

    static {
        $assertionsDisabled = !StreamBinarySerializer.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(StreamBinarySerializer.class);
        OUT_OF_BOUNDS_EXCEPTION = new ArrayIndexOutOfBoundsException();
    }
}
