package io.datakernel.datastream;

import io.datakernel.csp.queue.ChannelBuffer;
import io.datakernel.promise.Promise;

/* loaded from: input_file:io/datakernel/datastream/StreamConsumerEndpoint.class */
public final class StreamConsumerEndpoint<T> extends AbstractStreamConsumer<T> implements StreamDataAcceptor<T> {
    public static final int DEFAULT_BUFFER_SIZE = 10;
    private final ChannelBuffer<T> buffer;
    static final /* synthetic */ boolean $assertionsDisabled;

    public StreamConsumerEndpoint() {
        this(0, 10);
    }

    public StreamConsumerEndpoint(int i) {
        this(0, i);
    }

    private StreamConsumerEndpoint(int i, int i2) {
        this.buffer = new ChannelBuffer<>(i, i2);
    }

    @Override // io.datakernel.datastream.AbstractStreamConsumer
    protected void onStarted() {
        getSupplier().resume(this);
    }

    @Override // io.datakernel.datastream.StreamDataAcceptor
    public void accept(T t) {
        if (!$assertionsDisabled && t == null) {
            throw new AssertionError();
        }
        try {
            this.buffer.add(t);
            if (this.buffer.isSaturated()) {
                getSupplier().suspend();
            }
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            close(e2);
        }
    }

    @Override // io.datakernel.datastream.AbstractStreamConsumer
    protected Promise<Void> onEndOfStream() {
        return this.buffer.put((Object) null);
    }

    @Override // io.datakernel.datastream.AbstractStreamConsumer
    protected void onError(Throwable th) {
        this.buffer.close(th);
    }

    public Promise<T> take() {
        if (this.buffer.willBeExhausted()) {
            getSupplier().resume(this);
        }
        return this.buffer.take();
    }

    static {
        $assertionsDisabled = !StreamConsumerEndpoint.class.desiredAssertionStatus();
    }
}
