package io.datakernel.stream.processor;

import io.datakernel.eventloop.Eventloop;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamTransformer_M_1;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;

/* loaded from: input_file:io/datakernel/stream/processor/AbstractStreamMemoryTransformer.class */
public abstract class AbstractStreamMemoryTransformer<I, S, O> extends AbstractStreamTransformer_M_1<O> implements StreamDataReceiver<I> {
    protected S state;

    /* loaded from: input_file:io/datakernel/stream/processor/AbstractStreamMemoryTransformer$Input.class */
    private final class Input extends AbstractStreamConsumer<I> {
        public Input(Eventloop eventloop) {
            super(eventloop);
        }

        @Override // io.datakernel.stream.StreamConsumer
        public StreamDataReceiver<I> getDataReceiver() {
            return AbstractStreamMemoryTransformer.this;
        }

        @Override // io.datakernel.stream.StreamConsumer
        public void onEndOfStream() {
            if (AbstractStreamMemoryTransformer.this.allUpstreamsEndOfStream()) {
                AbstractStreamMemoryTransformer.this.afterEndOfStream(AbstractStreamMemoryTransformer.this.state);
                AbstractStreamMemoryTransformer.this.state = null;
                AbstractStreamMemoryTransformer.this.produce();
            }
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
        public void onError(Exception exc) {
            this.upstreamProducer.closeWithError(exc);
            AbstractStreamMemoryTransformer.this.closeWithError(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamMemoryTransformer(Eventloop eventloop) {
        super(eventloop);
        this.state = newState();
    }

    protected abstract S newState();

    protected abstract void apply(S s, I i);

    protected abstract void afterEndOfStream(S s);

    public final StreamConsumer<I> newInput() {
        return addInput(new Input(this.eventloop));
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void onProducerStarted() {
        if (this.inputs.isEmpty()) {
            sendEndOfStream();
        }
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void onSuspended() {
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void onResumed() {
        if (this.state == null) {
            resumeProduce();
        }
    }

    @Override // io.datakernel.stream.StreamDataReceiver
    public void onData(I i) {
        apply(this.state, i);
    }
}
