/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.processor;

import io.datakernel.annotation.Nullable;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufPool;
import io.datakernel.serializer.BufferSerializer;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamStatus;
import io.datakernel.stream.processor.StreamTransformer;
import io.datakernel.util.MemSize;
import io.datakernel.util.Preconditions;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class StreamBinarySerializer<T>
implements StreamTransformer<T, ByteBuf> {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final ArrayIndexOutOfBoundsException OUT_OF_BOUNDS_EXCEPTION = new ArrayIndexOutOfBoundsException();
    public static final MemSize DEFAULT_INITIAL_BUFFER_SIZE = MemSize.kilobytes((long)16L);
    public static final MemSize MAX_SIZE_1 = MemSize.bytes((long)128L);
    public static final MemSize MAX_SIZE_2 = MemSize.kilobytes((long)16L);
    public static final MemSize MAX_SIZE_3;
    public static final MemSize MAX_SIZE;
    private final BufferSerializer<T> serializer;
    private MemSize initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE;
    private MemSize maxMessageSize = MAX_SIZE;
    private Duration autoFlushInterval;
    private boolean skipSerializationErrors = false;
    private Input input;
    private Output output;

    private StreamBinarySerializer(BufferSerializer<T> serializer) {
        this.serializer = serializer;
        this.rebuild();
    }

    private void rebuild() {
        if (this.output != null && this.output.outputBuf != null) {
            this.output.outputBuf.recycle();
        }
        this.input = new Input();
        this.output = new Output(this.serializer, this.initialBufferSize.toInt(), this.maxMessageSize.toInt(), this.autoFlushInterval, this.skipSerializationErrors);
    }

    public static <T> StreamBinarySerializer<T> create(BufferSerializer<T> serializer) {
        return new StreamBinarySerializer<T>(serializer);
    }

    public StreamBinarySerializer<T> withInitialBufferSize(MemSize bufferSize) {
        this.initialBufferSize = bufferSize;
        this.rebuild();
        return this;
    }

    public StreamBinarySerializer<T> withMaxMessageSize(MemSize maxMessageSize) {
        this.maxMessageSize = maxMessageSize;
        this.rebuild();
        return this;
    }

    public StreamBinarySerializer<T> withAutoFlushInterval(@Nullable Duration autoFlushInterval) {
        this.autoFlushInterval = autoFlushInterval;
        this.rebuild();
        return this;
    }

    public StreamBinarySerializer<T> withSkipSerializationErrors() {
        return this.withSkipSerializationErrors(true);
    }

    public StreamBinarySerializer<T> withSkipSerializationErrors(boolean skipSerializationErrors) {
        this.skipSerializationErrors = skipSerializationErrors;
        this.rebuild();
        return this;
    }

    @Override
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override
    public StreamProducer<ByteBuf> getOutput() {
        return this.output;
    }

    public void flush() {
        if (this.output.getStatus().isOpen() && this.output.getLastDataReceiver() != null) {
            this.output.flush();
        }
    }

    private static int varint32Size(int value) {
        if ((value & 0xFFFFFF80) == 0) {
            return 1;
        }
        if ((value & 0xFFFFC000) == 0) {
            return 2;
        }
        if ((value & 0xFFE00000) == 0) {
            return 3;
        }
        if ((value & 0xF0000000) == 0) {
            return 4;
        }
        return 5;
    }

    static {
        MAX_SIZE = MAX_SIZE_3 = MemSize.megabytes((long)2L);
    }

    private final class Output
    extends AbstractStreamProducer<ByteBuf>
    implements StreamDataReceiver<T> {
        private final BufferSerializer<T> serializer;
        private final int initialBufferSize;
        private final int maxMessageSize;
        private final int headerSize;
        private ByteBuf outputBuf = ByteBuf.empty();
        private int estimatedMessageSize;
        private final int autoFlushIntervalMillis;
        private boolean flushPosted;
        private final boolean skipSerializationErrors;

        public Output(BufferSerializer<T> serializer, int initialBufferSize, @Nullable int maxMessageSize, Duration autoFlushInterval, boolean skipSerializationErrors) {
            this.skipSerializationErrors = skipSerializationErrors;
            this.serializer = (BufferSerializer)Preconditions.checkNotNull(serializer);
            this.maxMessageSize = maxMessageSize;
            this.headerSize = StreamBinarySerializer.varint32Size(maxMessageSize - 1);
            this.estimatedMessageSize = 1;
            this.initialBufferSize = initialBufferSize;
            this.autoFlushIntervalMillis = autoFlushInterval == null ? -1 : (int)autoFlushInterval.toMillis();
        }

        @Override
        protected void onSuspended() {
            StreamBinarySerializer.this.input.getProducer().suspend();
        }

        @Override
        protected void produce() {
            if (StreamBinarySerializer.this.input.getStatus() != StreamStatus.END_OF_STREAM) {
                StreamBinarySerializer.this.input.getProducer().produce(this);
            } else {
                this.flushAndClose();
            }
        }

        @Override
        protected void onError(Throwable t) {
            StreamBinarySerializer.this.input.closeWithError(t);
        }

        private ByteBuf allocateBuffer() {
            return ByteBufPool.allocate((int)Math.max(this.initialBufferSize, this.headerSize + this.estimatedMessageSize + (this.estimatedMessageSize >>> 2)));
        }

        private void flush() {
            if (this.outputBuf.canRead()) {
                this.getLastDataReceiver().onData(this.outputBuf);
                this.estimatedMessageSize -= this.estimatedMessageSize >>> 8;
            } else {
                this.outputBuf.recycle();
            }
            this.outputBuf = ByteBuf.empty();
        }

        @Override
        public void onData(T item) {
            int positionItem;
            int positionBegin;
            while (true) {
                if (this.outputBuf.writeRemaining() < this.headerSize + this.estimatedMessageSize + (this.estimatedMessageSize >>> 2)) {
                    this.onFullBuffer();
                }
                positionBegin = this.outputBuf.writePosition();
                positionItem = positionBegin + this.headerSize;
                this.outputBuf.writePosition(positionItem);
                try {
                    this.serializer.serialize(this.outputBuf, item);
                }
                catch (ArrayIndexOutOfBoundsException e) {
                    this.onUnderEstimate(item, positionBegin);
                    continue;
                }
                catch (Exception e) {
                    this.onSerializationError(item, positionBegin, e);
                    return;
                }
                break;
            }
            int positionEnd = this.outputBuf.writePosition();
            int messageSize = positionEnd - positionItem;
            if (messageSize > this.estimatedMessageSize) {
                if (messageSize < this.maxMessageSize) {
                    this.estimatedMessageSize = messageSize;
                } else {
                    this.onMessageOverflow(item, positionBegin, messageSize);
                    return;
                }
            }
            this.writeSize(this.outputBuf.array(), positionBegin, messageSize);
        }

        private void writeSize(byte[] buf, int pos, int size) {
            if (this.headerSize == 1) {
                buf[pos] = (byte)size;
                return;
            }
            buf[pos] = (byte)(size & 0x7F | 0x80);
            size >>>= 7;
            if (this.headerSize == 2) {
                buf[pos + 1] = (byte)size;
                return;
            }
            assert (this.headerSize == 3);
            buf[pos + 1] = (byte)(size & 0x7F | 0x80);
            buf[pos + 2] = (byte)(size >>>= 7);
        }

        private void onFullBuffer() {
            this.flush();
            this.outputBuf = this.allocateBuffer();
            if (!this.flushPosted) {
                this.postFlush();
            }
        }

        private void onSerializationError(T value, int positionBegin, Exception e) {
            this.outputBuf.writePosition(positionBegin);
            this.handleSerializationError(e);
        }

        private void onMessageOverflow(T value, int positionBegin, int messageSize) {
            this.outputBuf.writePosition(positionBegin);
            this.handleSerializationError(OUT_OF_BOUNDS_EXCEPTION);
        }

        private void onUnderEstimate(T value, int positionBegin) {
            this.outputBuf.writePosition(positionBegin);
            int writeRemaining = this.outputBuf.writeRemaining();
            this.flush();
            this.outputBuf = ByteBufPool.allocate((int)Math.max(this.initialBufferSize, writeRemaining + (writeRemaining >>> 1) + 1));
        }

        private void handleSerializationError(Exception e) {
            if (this.skipSerializationErrors) {
                StreamBinarySerializer.this.logger.warn("Skipping serialization error in {}", (Object)this, (Object)e);
            } else {
                this.closeWithError(e);
            }
        }

        private void flushAndClose() {
            this.flush();
            StreamBinarySerializer.this.output.sendEndOfStream();
        }

        private void postFlush() {
            this.flushPosted = true;
            if (this.autoFlushIntervalMillis == -1) {
                return;
            }
            if (this.autoFlushIntervalMillis == 0) {
                this.eventloop.postLater(() -> {
                    this.flushPosted = false;
                    this.flush();
                });
            } else {
                this.eventloop.delayBackground((long)this.autoFlushIntervalMillis, () -> {
                    this.flushPosted = false;
                    this.flush();
                });
            }
        }

        @Override
        protected void cleanup() {
            if (this.outputBuf != null) {
                this.outputBuf.recycle();
                this.outputBuf = ByteBuf.empty();
            }
        }
    }

    private final class Input
    extends AbstractStreamConsumer<T> {
        private Input() {
        }

        @Override
        protected void onEndOfStream() {
            StreamBinarySerializer.this.output.flushAndClose();
        }

        @Override
        protected void onError(Throwable t) {
            StreamBinarySerializer.this.output.closeWithError(t);
        }
    }
}

