package io.activej.datastream.processor;

import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.promise.Promise;
import java.util.Objects;
import java.util.function.Predicate;

/* loaded from: input_file:io/activej/datastream/processor/StreamFilter.class */
public final class StreamFilter<T> implements StreamTransformer<T, T> {
    private final Predicate<T> predicate;
    private final StreamFilter<T>.Input input = new Input();
    private final StreamFilter<T>.Output output = new Output();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/processor/StreamFilter$Input.class */
    public final class Input extends AbstractStreamConsumer<T> {
        private Input() {
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            StreamFilter.this.sync();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            StreamFilter.this.output.sendEndOfStream();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/processor/StreamFilter$Output.class */
    public final class Output extends AbstractStreamSupplier<T> {
        private Output() {
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            StreamFilter.this.sync();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamFilter.this.sync();
        }
    }

    private StreamFilter(Predicate<T> predicate) {
        this.predicate = predicate;
        Promise<Void> acknowledgement = this.input.getAcknowledgement();
        StreamFilter<T>.Output output = this.output;
        Objects.requireNonNull(output);
        acknowledgement.whenException(output::closeEx);
        Promise<Void> acknowledgement2 = this.output.getAcknowledgement();
        StreamFilter<T>.Input input = this.input;
        Objects.requireNonNull(input);
        Promise whenResult = acknowledgement2.whenResult(input::acknowledge);
        StreamFilter<T>.Input input2 = this.input;
        Objects.requireNonNull(input2);
        whenResult.whenException(input2::closeEx);
    }

    public static <T> StreamFilter<T> create(Predicate<T> predicate) {
        return new StreamFilter<>(predicate);
    }

    @Override // io.activej.datastream.dsl.HasStreamInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.activej.datastream.dsl.HasStreamOutput
    public StreamSupplier<T> getOutput() {
        return this.output;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sync() {
        StreamDataAcceptor<T> dataAcceptor = this.output.getDataAcceptor();
        if (dataAcceptor == null) {
            this.input.suspend();
        } else {
            Predicate<T> predicate = this.predicate;
            this.input.resume(obj -> {
                if (predicate.test(obj)) {
                    dataAcceptor.accept(obj);
                }
            });
        }
    }
}
