package io.vertx.grpc.common.impl;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.impl.InboundBuffer;
import io.vertx.grpc.common.GrpcError;
import io.vertx.grpc.common.GrpcMessage;
import io.vertx.grpc.common.GrpcReadStream;
import io.vertx.grpc.common.impl.GrpcReadStreamBase;
import java.util.function.BiConsumer;
import java.util.stream.Collector;

/* loaded from: input_file:io/vertx/grpc/common/impl/GrpcReadStreamBase.class */
public abstract class GrpcReadStreamBase<S extends GrpcReadStreamBase<S, T>, T> implements GrpcReadStream<T>, Handler<Buffer> {
    static final GrpcMessage END_SENTINEL = new GrpcMessage() { // from class: io.vertx.grpc.common.impl.GrpcReadStreamBase.1
        @Override // io.vertx.grpc.common.GrpcMessage
        public String encoding() {
            return null;
        }

        @Override // io.vertx.grpc.common.GrpcMessage
        public Buffer payload() {
            return null;
        }
    };
    protected final ContextInternal context;
    private final String encoding;
    private final ReadStream<Buffer> stream;
    private final InboundBuffer<GrpcMessage> queue;
    private Buffer buffer;
    private Handler<GrpcError> errorHandler;
    private Handler<Throwable> exceptionHandler;
    private Handler<GrpcMessage> messageHandler;
    private Handler<Void> endHandler;

    protected GrpcReadStreamBase(Context context, ReadStream<Buffer> readStream, String str) {
        this.context = (ContextInternal) context;
        this.encoding = str;
        this.stream = readStream;
        this.queue = new InboundBuffer<>(context);
    }

    public void init() {
        this.stream.handler(this);
        this.stream.endHandler(r4 -> {
            this.queue.write(END_SENTINEL);
        });
        this.stream.exceptionHandler(th -> {
            if (th instanceof StreamResetException) {
                handleReset(((StreamResetException) th).getCode());
            } else {
                handleException(th);
            }
        });
        this.queue.drainHandler(r3 -> {
            this.stream.resume();
        });
        this.queue.handler(grpcMessage -> {
            if (grpcMessage == END_SENTINEL) {
                handleEnd();
            } else {
                handleMessage(grpcMessage);
            }
        });
    }

    public void handle(Buffer buffer) {
        if (this.buffer == null) {
            this.buffer = buffer;
        } else {
            this.buffer.appendBuffer(buffer);
        }
        int i = 0;
        boolean z = false;
        while (i + 5 <= this.buffer.length()) {
            int i2 = this.buffer.getInt(i + 1);
            if (i + 5 + i2 > this.buffer.length()) {
                break;
            }
            boolean z2 = this.buffer.getByte(i) == 1;
            if (z2 && this.encoding == null) {
                throw new UnsupportedOperationException("Handle me");
            }
            z |= !this.queue.write(GrpcMessage.message(z2 ? this.encoding : "identity", this.buffer.slice(i + 5, (i + 5) + i2)));
            i += 5 + i2;
        }
        if (z) {
            this.stream.pause();
        }
        if (i < this.buffer.length()) {
            this.buffer = this.buffer.getBuffer(i, this.buffer.length());
        } else {
            this.buffer = null;
        }
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    /* renamed from: pause */
    public S mo6pause() {
        this.queue.pause();
        return this;
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    /* renamed from: resume */
    public S mo5resume() {
        this.queue.resume();
        return this;
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    /* renamed from: fetch */
    public S mo4fetch(long j) {
        this.queue.fetch(j);
        return this;
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    public S errorHandler(Handler<GrpcError> handler) {
        this.errorHandler = handler;
        return this;
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    public S exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    public S messageHandler(Handler<GrpcMessage> handler) {
        this.messageHandler = handler;
        return this;
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    public S endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    protected void handleReset(long j) {
        GrpcError mapHttp2ErrorCode;
        Handler<GrpcError> handler = this.errorHandler;
        if (handler == null || (mapHttp2ErrorCode = GrpcError.mapHttp2ErrorCode(j)) == null) {
            return;
        }
        handler.handle(mapHttp2ErrorCode);
    }

    protected void handleException(Throwable th) {
        Handler<Throwable> handler = this.exceptionHandler;
        if (handler != null) {
            handler.handle(th);
        }
    }

    protected void handleEnd() {
        Handler<Void> handler = this.endHandler;
        if (handler != null) {
            handler.handle((Object) null);
        }
    }

    protected void handleMessage(GrpcMessage grpcMessage) {
        Handler<GrpcMessage> handler = this.messageHandler;
        if (handler != null) {
            handler.handle(grpcMessage);
        }
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    public Future<T> last() {
        PromiseInternal promise = this.context.promise();
        Object[] objArr = new Object[1];
        m7handler((Handler) obj -> {
            objArr[0] = obj;
        });
        endHandler(r6 -> {
            promise.tryComplete(objArr[0]);
        });
        exceptionHandler(th -> {
            promise.tryFail(th);
        });
        return promise.future();
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    public <R, C> Future<R> collecting(Collector<T, C, R> collector) {
        PromiseInternal promise = this.context.promise();
        C c = collector.supplier().get();
        BiConsumer<C, T> accumulator = collector.accumulator();
        m7handler((Handler) obj -> {
            accumulator.accept(c, obj);
        });
        endHandler(r6 -> {
            promise.tryComplete(collector.finisher().apply(c));
        });
        promise.getClass();
        exceptionHandler(promise::tryFail);
        return promise.future();
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    public /* bridge */ /* synthetic */ GrpcReadStream endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    public /* bridge */ /* synthetic */ GrpcReadStream exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    public /* bridge */ /* synthetic */ GrpcReadStream errorHandler(Handler handler) {
        return errorHandler((Handler<GrpcError>) handler);
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    public /* bridge */ /* synthetic */ GrpcReadStream messageHandler(Handler handler) {
        return messageHandler((Handler<GrpcMessage>) handler);
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    /* renamed from: endHandler */
    public /* bridge */ /* synthetic */ ReadStream mo3endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ ReadStream mo8exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.grpc.common.GrpcReadStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo9exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
