package org.df4j.core.tasknode.messagestream;

import org.df4j.core.boundconnector.messagestream.StreamOutput;
import org.df4j.core.boundconnector.messagestream.StreamPublisher;
import org.df4j.core.boundconnector.messagestream.StreamSubscriber;

/* loaded from: input_file:org/df4j/core/tasknode/messagestream/StreamProcessor.class */
public abstract class StreamProcessor<M, R> extends Actor1<M> implements StreamPublisher<R> {
    protected final StreamOutput<R> output = new StreamOutput<>(this);

    @Override // org.df4j.core.boundconnector.messagestream.StreamPublisher
    public <S extends StreamSubscriber<? super R>> S subscribe(S s) {
        this.output.subscribe(s);
        return s;
    }

    @Override // org.df4j.core.tasknode.messagestream.Actor1
    protected void runAction(M m) {
        if (m == null) {
            this.output.complete();
        } else {
            this.output.post(process(m));
        }
    }

    protected abstract R process(M m);
}
