package io.activej.crdt.util;

import io.activej.datastream.consumer.AbstractStreamConsumer;
import io.activej.datastream.consumer.StreamConsumer;
import io.activej.datastream.processor.transformer.StreamTransformer;
import io.activej.datastream.supplier.AbstractStreamSupplier;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.promise.Promise;
import io.activej.reactor.ImplicitlyReactive;
import java.util.Objects;
import java.util.function.UnaryOperator;

/* loaded from: input_file:io/activej/crdt/util/StreamAckTransformer.class */
public final class StreamAckTransformer<T> extends ImplicitlyReactive implements StreamTransformer<T, T> {
    private final StreamAckTransformer<T>.Input input = new Input();
    private final StreamAckTransformer<T>.Output output = new Output();

    /* loaded from: input_file:io/activej/crdt/util/StreamAckTransformer$Input.class */
    public final class Input extends AbstractStreamConsumer<T> {
        public Input() {
        }

        protected void onStarted() {
            StreamAckTransformer.this.input.resume(StreamAckTransformer.this.output.getDataAcceptor());
        }

        protected void onEndOfStream() {
            StreamAckTransformer.this.output.sendEndOfStream();
        }
    }

    /* loaded from: input_file:io/activej/crdt/util/StreamAckTransformer$Output.class */
    public final class Output extends AbstractStreamSupplier<T> {
        public Output() {
        }

        protected void onResumed() {
            StreamAckTransformer.this.input.resume(StreamAckTransformer.this.output.getDataAcceptor());
        }

        protected void onSuspended() {
            StreamAckTransformer.this.input.suspend();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamAckTransformer(UnaryOperator<Promise<Void>> unaryOperator) {
        Promise acknowledgement = this.input.getAcknowledgement();
        StreamAckTransformer<T>.Output output = this.output;
        Objects.requireNonNull(output);
        acknowledgement.whenException(output::closeEx);
        Promise promise = (Promise) unaryOperator.apply(((StreamAckTransformer<T>.Output) this.output).getAcknowledgement());
        StreamAckTransformer<T>.Input input = this.input;
        Objects.requireNonNull(input);
        Promise whenResult = promise.whenResult(input::acknowledge);
        StreamAckTransformer<T>.Input input2 = this.input;
        Objects.requireNonNull(input2);
        whenResult.whenException(input2::closeEx);
    }

    public StreamConsumer<T> getInput() {
        return this.input;
    }

    public StreamSupplier<T> getOutput() {
        return this.output;
    }
}
