/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.processor;

import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.processor.StreamTransformer;
import java.util.function.Predicate;

public final class StreamFilter<T>
implements StreamTransformer<T, T> {
    public static final Predicate<Object> ALWAYS_TRUE = t -> true;
    private final Input input;
    private final Output output;
    private final Predicate<T> predicate;

    private StreamFilter(Predicate<T> predicate) {
        this.predicate = predicate;
        this.input = new Input();
        this.output = new Output();
    }

    @Override
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override
    public StreamProducer<T> getOutput() {
        return this.output;
    }

    public static <T> StreamFilter<T> create(Predicate<T> predicate) {
        return new StreamFilter<T>(predicate);
    }

    protected final class Output
    extends AbstractStreamProducer<T> {
        protected Output() {
        }

        @Override
        protected void onSuspended() {
            StreamFilter.this.input.getProducer().suspend();
        }

        @Override
        protected void onError(Throwable t) {
            StreamFilter.this.input.closeWithError(t);
        }

        @Override
        protected void onProduce(StreamDataReceiver<T> dataReceiver) {
            if (StreamFilter.this.predicate.equals(ALWAYS_TRUE)) {
                StreamFilter.this.input.getProducer().produce(dataReceiver);
            } else {
                Predicate predicate = StreamFilter.this.predicate;
                StreamFilter.this.input.getProducer().produce(item -> {
                    if (predicate.test(item)) {
                        dataReceiver.onData(item);
                    }
                });
            }
        }
    }

    protected final class Input
    extends AbstractStreamConsumer<T> {
        protected Input() {
        }

        @Override
        protected void onEndOfStream() {
            StreamFilter.this.output.sendEndOfStream();
        }

        @Override
        protected void onError(Throwable t) {
            StreamFilter.this.output.closeWithError(t);
        }
    }
}

