package io.datakernel.datastream.csp;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufPool;
import io.datakernel.common.MemSize;
import io.datakernel.common.Preconditions;
import io.datakernel.common.Utils;
import io.datakernel.csp.ChannelConsumer;
import io.datakernel.csp.ChannelOutput;
import io.datakernel.datastream.AbstractStreamConsumer;
import io.datakernel.datastream.StreamDataAcceptor;
import io.datakernel.promise.Promise;
import io.datakernel.serializer.BinarySerializer;
import java.time.Duration;
import java.util.ArrayDeque;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/datastream/csp/ChannelSerializer.class */
public final class ChannelSerializer<T> extends AbstractStreamConsumer<T> implements WithStreamToChannel<ChannelSerializer<T>, T, ByteBuf> {
    private static final Logger logger = LoggerFactory.getLogger(ChannelSerializer.class);
    private static final ArrayIndexOutOfBoundsException OUT_OF_BOUNDS_EXCEPTION = new ArrayIndexOutOfBoundsException("Message overflow");
    public static final MemSize DEFAULT_INITIAL_BUFFER_SIZE = MemSize.kilobytes(16);
    public static final MemSize MAX_SIZE_1 = MemSize.bytes(128);
    public static final MemSize MAX_SIZE_2 = MemSize.kilobytes(16);
    public static final MemSize MAX_SIZE_3 = MemSize.megabytes(2);
    public static final MemSize MAX_SIZE = MAX_SIZE_3;
    private final BinarySerializer<T> serializer;

    @Nullable
    private Duration autoFlushInterval;
    private ChannelSerializer<T>.Input input;
    private ChannelConsumer<ByteBuf> output;
    private boolean flushing;
    private MemSize initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE;
    private MemSize maxMessageSize = MAX_SIZE;
    private boolean skipSerializationErrors = false;
    private final ArrayDeque<ByteBuf> bufs = new ArrayDeque<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datakernel/datastream/csp/ChannelSerializer$Input.class */
    public final class Input implements StreamDataAcceptor<T> {
        private final BinarySerializer<T> serializer;
        private ByteBuf buf = ByteBuf.empty();
        private int estimatedMessageSize = 1;
        private final int headerSize;
        private final int maxMessageSize;
        private final int initialBufferSize;
        private final int autoFlushIntervalMillis;
        private boolean flushPosted;
        private final boolean skipSerializationErrors;
        static final /* synthetic */ boolean $assertionsDisabled;

        public Input(BinarySerializer<T> binarySerializer, int i, int i2, @Nullable Duration duration, boolean z) {
            this.skipSerializationErrors = z;
            this.serializer = (BinarySerializer) Preconditions.checkNotNull(binarySerializer);
            this.maxMessageSize = i2;
            this.headerSize = ChannelSerializer.varint32Size(i2 - 1);
            this.initialBufferSize = i;
            this.autoFlushIntervalMillis = duration == null ? -1 : (int) duration.toMillis();
        }

        @Override // io.datakernel.datastream.StreamDataAcceptor
        public void accept(T t) {
            int tail;
            int i;
            while (true) {
                if (this.buf.writeRemaining() < this.headerSize + this.estimatedMessageSize + (this.estimatedMessageSize >>> 2)) {
                    onFullBuffer();
                }
                tail = this.buf.tail();
                i = tail + this.headerSize;
                this.buf.tail(i);
                try {
                    this.buf.tail(this.serializer.encode(this.buf.array(), this.buf.tail(), t));
                    break;
                } catch (ArrayIndexOutOfBoundsException e) {
                    onUnderEstimate(tail);
                } catch (Exception e2) {
                    onSerializationError(tail, e2);
                    return;
                }
            }
            int tail2 = this.buf.tail() - i;
            if (tail2 > this.estimatedMessageSize) {
                if (tail2 >= this.maxMessageSize) {
                    onMessageOverflow(tail);
                    return;
                }
                this.estimatedMessageSize = tail2;
            }
            writeSize(this.buf.array(), tail, tail2);
        }

        private void writeSize(byte[] bArr, int i, int i2) {
            if (this.headerSize == 1) {
                bArr[i] = (byte) i2;
                return;
            }
            bArr[i] = (byte) ((i2 & 127) | 128);
            int i3 = i2 >>> 7;
            if (this.headerSize == 2) {
                bArr[i + 1] = (byte) i3;
            } else {
                if (!$assertionsDisabled && this.headerSize != 3) {
                    throw new AssertionError();
                }
                bArr[i + 1] = (byte) ((i3 & 127) | 128);
                bArr[i + 2] = (byte) (i3 >>> 7);
            }
        }

        private ByteBuf allocateBuffer() {
            return ByteBufPool.allocate(Math.max(this.initialBufferSize, this.headerSize + this.estimatedMessageSize + (this.estimatedMessageSize >>> 2)));
        }

        private void onFullBuffer() {
            flush();
            this.buf = allocateBuffer();
            if (this.flushPosted) {
                return;
            }
            postFlush();
        }

        private void onUnderEstimate(int i) {
            this.buf.tail(i);
            int writeRemaining = this.buf.writeRemaining();
            flush();
            this.buf = ByteBufPool.allocate(Math.max(this.initialBufferSize, writeRemaining + (writeRemaining >>> 1) + 1));
        }

        private void onMessageOverflow(int i) {
            this.buf.tail(i);
            handleSerializationError(ChannelSerializer.OUT_OF_BOUNDS_EXCEPTION);
        }

        private void onSerializationError(int i, Exception exc) {
            this.buf.tail(i);
            handleSerializationError(exc);
        }

        private void handleSerializationError(Exception exc) {
            if (this.skipSerializationErrors) {
                ChannelSerializer.logger.warn("Skipping serialization error in {}", this, exc);
            } else {
                ChannelSerializer.this.close(exc);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flush() {
            if (this.buf.canRead()) {
                if (!ChannelSerializer.this.bufs.isEmpty()) {
                    ChannelSerializer.this.getSupplier().suspend();
                }
                ChannelSerializer.this.bufs.add(this.buf);
                this.estimatedMessageSize -= this.estimatedMessageSize >>> 8;
            } else {
                this.buf.recycle();
            }
            this.buf = ByteBuf.empty();
            ChannelSerializer.this.doFlush();
        }

        private void postFlush() {
            this.flushPosted = true;
            if (this.autoFlushIntervalMillis == -1) {
                return;
            }
            if (this.autoFlushIntervalMillis == 0) {
                ChannelSerializer.this.eventloop.postLater(() -> {
                    this.flushPosted = false;
                    flush();
                });
            } else {
                ChannelSerializer.this.eventloop.delayBackground(this.autoFlushIntervalMillis, () -> {
                    this.flushPosted = false;
                    flush();
                });
            }
        }

        static {
            $assertionsDisabled = !ChannelSerializer.class.desiredAssertionStatus();
        }
    }

    private ChannelSerializer(BinarySerializer<T> binarySerializer) {
        this.serializer = binarySerializer;
        rebuild();
    }

    private void rebuild() {
        if (this.input != null && ((Input) this.input).buf != null) {
            ((Input) this.input).buf.recycle();
        }
        this.input = new Input(this.serializer, this.initialBufferSize.toInt(), this.maxMessageSize.toInt(), this.autoFlushInterval, this.skipSerializationErrors);
    }

    public static <T> ChannelSerializer<T> create(BinarySerializer<T> binarySerializer) {
        return new ChannelSerializer<>(binarySerializer);
    }

    public ChannelSerializer<T> withInitialBufferSize(MemSize memSize) {
        this.initialBufferSize = memSize;
        rebuild();
        return this;
    }

    public ChannelSerializer<T> withMaxMessageSize(MemSize memSize) {
        this.maxMessageSize = memSize;
        rebuild();
        return this;
    }

    public ChannelSerializer<T> withAutoFlushInterval(@Nullable Duration duration) {
        this.autoFlushInterval = duration;
        rebuild();
        return this;
    }

    public ChannelSerializer<T> withSkipSerializationErrors() {
        return withSkipSerializationErrors(true);
    }

    public ChannelSerializer<T> withSkipSerializationErrors(boolean z) {
        this.skipSerializationErrors = z;
        rebuild();
        return this;
    }

    public ChannelOutput<ByteBuf> getOutput() {
        return channelConsumer -> {
            this.output = channelConsumer;
        };
    }

    @Override // io.datakernel.datastream.AbstractStreamConsumer
    protected void onStarted() {
        getSupplier().resume(this.input);
    }

    @Override // io.datakernel.datastream.AbstractStreamConsumer
    protected Promise<Void> onEndOfStream() {
        this.input.flush();
        return getAcknowledgement();
    }

    @Override // io.datakernel.datastream.AbstractStreamConsumer
    protected void onError(Throwable th) {
        this.bufs.clear();
        ((Input) this.input).buf = (ByteBuf) Utils.nullify(((Input) this.input).buf, (v0) -> {
            v0.recycle();
        });
        this.output.close(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doFlush() {
        if (this.flushing) {
            return;
        }
        if (!this.bufs.isEmpty()) {
            this.flushing = true;
            this.output.accept(this.bufs.poll()).whenComplete((r4, th) -> {
                if (th != null) {
                    close(th);
                } else {
                    this.flushing = false;
                    doFlush();
                }
            });
        } else if (!getEndOfStream().isResult()) {
            getSupplier().resume(this.input);
        } else {
            this.flushing = true;
            this.output.accept((Object) null).whenResult(r3 -> {
                acknowledge();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int varint32Size(int i) {
        if ((i & (-128)) == 0) {
            return 1;
        }
        if ((i & (-16384)) == 0) {
            return 2;
        }
        if ((i & (-2097152)) == 0) {
            return 3;
        }
        return (i & (-268435456)) == 0 ? 4 : 5;
    }
}
