package io.datakernel.datastream.processor;

import io.datakernel.datastream.AbstractStreamConsumer;
import io.datakernel.datastream.AbstractStreamSupplier;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamDataAcceptor;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.promise.Promise;
import java.util.function.Predicate;

/* loaded from: input_file:io/datakernel/datastream/processor/StreamFilter.class */
public final class StreamFilter<T> implements StreamTransformer<T, T> {
    public static final Predicate<Object> ALWAYS_TRUE = obj -> {
        return true;
    };
    private final StreamFilter<T>.Input input = new Input();
    private final StreamFilter<T>.Output output = new Output();
    private final Predicate<T> predicate;

    /* loaded from: input_file:io/datakernel/datastream/processor/StreamFilter$Input.class */
    protected final class Input extends AbstractStreamConsumer<T> {
        protected Input() {
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected Promise<Void> onEndOfStream() {
            return StreamFilter.this.output.sendEndOfStream();
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            StreamFilter.this.output.close(th);
        }
    }

    /* loaded from: input_file:io/datakernel/datastream/processor/StreamFilter$Output.class */
    protected final class Output extends AbstractStreamSupplier<T> {
        protected Output() {
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamFilter.this.input.getSupplier().suspend();
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier
        protected void onError(Throwable th) {
            StreamFilter.this.input.close(th);
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier
        protected void onProduce(StreamDataAcceptor<T> streamDataAcceptor) {
            if (StreamFilter.this.predicate.equals(StreamFilter.ALWAYS_TRUE)) {
                StreamFilter.this.input.getSupplier().resume(streamDataAcceptor);
            } else {
                Predicate predicate = StreamFilter.this.predicate;
                StreamFilter.this.input.getSupplier().resume(obj -> {
                    if (predicate.test(obj)) {
                        streamDataAcceptor.accept(obj);
                    }
                });
            }
        }
    }

    private StreamFilter(Predicate<T> predicate) {
        this.predicate = predicate;
    }

    @Override // io.datakernel.datastream.StreamInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.datakernel.datastream.StreamOutput
    public StreamSupplier<T> getOutput() {
        return this.output;
    }

    public static <T> StreamFilter<T> create(Predicate<T> predicate) {
        return new StreamFilter<>(predicate);
    }
}
