package io.datakernel.datastream;

import io.datakernel.async.process.Cancellable;
import io.datakernel.common.Preconditions;
import io.datakernel.csp.AbstractChannelConsumer;
import io.datakernel.csp.ChannelConsumer;
import io.datakernel.datastream.StreamConsumers;
import io.datakernel.datastream.processor.StreamLateBinder;
import io.datakernel.datastream.processor.StreamTransformer;
import io.datakernel.promise.CompleteNullPromise;
import io.datakernel.promise.Promise;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/datakernel/datastream/StreamConsumer.class */
public interface StreamConsumer<T> extends Cancellable {
    public static final String LATE_BINDING_ERROR_MESSAGE = "StreamConsumer %s does not have LATE_BINDING capabilities, it must be bound in the same tick when it is created. Alternatively, use .withLateBinding() modifier";

    void setSupplier(StreamSupplier<T> streamSupplier);

    Promise<Void> getAcknowledgement();

    Set<StreamCapability> getCapabilities();

    static <T> StreamConsumer<T> idle() {
        return new StreamConsumers.Idle();
    }

    static <T> StreamConsumer<T> skip() {
        return new StreamConsumers.Skip();
    }

    @Deprecated
    static <T> StreamConsumer<T> of(Consumer<T> consumer) {
        return new StreamConsumers.OfConsumerImpl(consumer);
    }

    static <T> StreamConsumer<T> closingWithError(Throwable th) {
        return new StreamConsumers.ClosingWithErrorImpl(th);
    }

    static <T> StreamConsumer<T> ofChannelConsumer(ChannelConsumer<T> channelConsumer) {
        return new StreamConsumers.OfChannelConsumerImpl(channelConsumer);
    }

    static <T> StreamConsumer<T> ofSupplier(Function<StreamSupplier<T>, Promise<Void>> function) {
        StreamTransformer identity = StreamTransformer.identity();
        CompleteNullPromise completeNullPromise = (Promise) function.apply(identity.getOutput());
        StreamConsumer<T> streamConsumer = (StreamConsumer<T>) identity.getInput();
        return completeNullPromise == Promise.complete() ? streamConsumer : streamConsumer.withAcknowledgement(promise -> {
            return promise.both(completeNullPromise);
        });
    }

    default <R> R transformWith(StreamConsumerTransformer<T, R> streamConsumerTransformer) {
        return streamConsumerTransformer.transform(this);
    }

    default StreamConsumer<T> withLateBinding() {
        return getCapabilities().contains(StreamCapability.LATE_BINDING) ? this : (StreamConsumer) transformWith(StreamLateBinder.create());
    }

    default ChannelConsumer<T> asSerialConsumer() {
        final StreamSupplierEndpoint streamSupplierEndpoint = new StreamSupplierEndpoint();
        streamSupplierEndpoint.streamTo(this);
        return new AbstractChannelConsumer<T>(this) { // from class: io.datakernel.datastream.StreamConsumer.1
            static final /* synthetic */ boolean $assertionsDisabled;

            protected Promise<Void> doAccept(T t) {
                if (t != null) {
                    return streamSupplierEndpoint.put(t);
                }
                if ($assertionsDisabled || streamSupplierEndpoint.getConsumer() != null) {
                    return streamSupplierEndpoint.put(null).both(streamSupplierEndpoint.getConsumer().getAcknowledgement());
                }
                throw new AssertionError();
            }

            static {
                $assertionsDisabled = !StreamConsumer.class.desiredAssertionStatus();
            }
        };
    }

    static <T> StreamConsumer<T> ofPromise(Promise<? extends StreamConsumer<T>> promise) {
        if (promise.isResult()) {
            return (StreamConsumer) promise.getResult();
        }
        StreamLateBinder create = StreamLateBinder.create();
        promise.whenComplete((streamConsumer, th) -> {
            if (th != null) {
                create.getOutput().streamTo(closingWithError(th));
            } else {
                Preconditions.checkArgument(streamConsumer.getCapabilities().contains(StreamCapability.LATE_BINDING), LATE_BINDING_ERROR_MESSAGE, new Object[]{streamConsumer});
                create.getOutput().streamTo(streamConsumer);
            }
        });
        return create.getInput();
    }

    default StreamConsumer<T> withAcknowledgement(Function<Promise<Void>, Promise<Void>> function) {
        Promise<Void> acknowledgement = getAcknowledgement();
        final Promise<Void> apply = function.apply(acknowledgement);
        return acknowledgement == apply ? this : new ForwardingStreamConsumer<T>(this) { // from class: io.datakernel.datastream.StreamConsumer.2
            @Override // io.datakernel.datastream.ForwardingStreamConsumer, io.datakernel.datastream.StreamConsumer
            public Promise<Void> getAcknowledgement() {
                return apply;
            }
        };
    }
}
