/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.processor;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufQueue;
import io.datakernel.exception.ParseException;
import io.datakernel.exception.TruncatedDataException;
import io.datakernel.serializer.BufferSerializer;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamStatus;
import io.datakernel.stream.processor.StreamTransformer;

public final class StreamBinaryDeserializer<T>
implements StreamTransformer<ByteBuf, T> {
    public static final ParseException HEADER_SIZE_EXCEPTION = new ParseException("Header size is too large");
    public static final ParseException DESERIALIZED_SIZE_EXCEPTION = new ParseException("Deserialized size != parsed data size");
    private final BufferSerializer<T> valueSerializer;
    private Input input;
    private Output output;

    private StreamBinaryDeserializer(BufferSerializer<T> valueSerializer) {
        this.valueSerializer = valueSerializer;
        this.input = new Input();
        this.output = new Output(valueSerializer);
    }

    public static <T> StreamBinaryDeserializer<T> create(BufferSerializer<T> valueSerializer) {
        return new StreamBinaryDeserializer<T>(valueSerializer);
    }

    @Override
    public StreamConsumer<ByteBuf> getInput() {
        return this.input;
    }

    @Override
    public StreamProducer<T> getOutput() {
        return this.output;
    }

    private static int tryPeekSize(ByteBufQueue queue) throws ParseException {
        assert (queue.hasRemaining());
        int dataSize = 0;
        int headerSize = 0;
        int b = queue.peekByte();
        if (b >= 0) {
            dataSize = b;
            headerSize = 1;
        } else if (queue.hasRemainingBytes(2)) {
            dataSize = b & 0x7F;
            b = queue.peekByte(1);
            if (b >= 0) {
                dataSize |= b << 7;
                headerSize = 2;
            } else if (queue.hasRemainingBytes(3)) {
                dataSize |= (b & 0x7F) << 7;
                b = queue.peekByte(2);
                if (b >= 0) {
                    dataSize |= b << 14;
                    headerSize = 3;
                } else {
                    throw HEADER_SIZE_EXCEPTION;
                }
            }
        }
        return (headerSize << 24) + dataSize;
    }

    private final class Output
    extends AbstractStreamProducer<T>
    implements StreamDataReceiver<ByteBuf> {
        private final ByteBufQueue queue = ByteBufQueue.create();
        private final BufferSerializer<T> valueSerializer;

        private Output(BufferSerializer<T> valueSerializer) {
            this.valueSerializer = valueSerializer;
        }

        @Override
        public void onData(ByteBuf buf) {
            this.queue.add(buf);
            this.produce();
        }

        @Override
        protected void onWired() {
            super.onWired();
        }

        @Override
        protected void onSuspended() {
            StreamBinaryDeserializer.this.input.getProducer().suspend();
        }

        @Override
        protected void produce() {
            block7: {
                try {
                    while (this.isReceiverReady() && this.queue.hasRemaining()) {
                        Object item;
                        int dataSize = StreamBinaryDeserializer.tryPeekSize(this.queue);
                        int headerSize = dataSize >>> 24;
                        int size = headerSize + (dataSize & 0xFFFFFF);
                        if (headerSize == 0 || !this.queue.hasRemainingBytes(size)) break;
                        ByteBuf buf = this.queue.takeExactSize(size);
                        buf.moveReadPosition(headerSize);
                        try {
                            item = this.valueSerializer.deserialize(buf);
                        }
                        catch (Exception e) {
                            throw new ParseException("Deserialization error", (Throwable)e);
                        }
                        if (buf.canRead()) {
                            throw DESERIALIZED_SIZE_EXCEPTION;
                        }
                        buf.recycle();
                        this.send(item);
                    }
                    if (!this.isReceiverReady()) break block7;
                    StreamBinaryDeserializer.this.input.getProducer().produce(this);
                    if (StreamBinaryDeserializer.this.input.getStatus() != StreamStatus.END_OF_STREAM) break block7;
                    if (this.queue.isEmpty()) {
                        StreamBinaryDeserializer.this.output.sendEndOfStream();
                        break block7;
                    }
                    throw new TruncatedDataException(String.format("Truncated serialized data stream, %s : %s", this, this.queue));
                }
                catch (ParseException e) {
                    this.closeWithError(e);
                }
            }
        }

        @Override
        protected void onError(Throwable t) {
            StreamBinaryDeserializer.this.input.closeWithError(t);
        }

        @Override
        protected void cleanup() {
            this.queue.clear();
        }
    }

    private final class Input
    extends AbstractStreamConsumer<ByteBuf> {
        private Input() {
        }

        @Override
        protected void onEndOfStream() {
            StreamBinaryDeserializer.this.output.produce();
        }

        @Override
        protected void onError(Throwable t) {
            StreamBinaryDeserializer.this.output.closeWithError(t);
        }
    }
}

