/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream;

import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.stream.DataStreams;
import io.datakernel.stream.StreamCapability;
import io.datakernel.stream.StreamConsumerModifier;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamConsumers;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.processor.StreamLateBinder;
import io.datakernel.util.Preconditions;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
import java.util.function.Consumer;

public interface StreamConsumer<T> {
    public static final String LATE_BINDING_ERROR_MESSAGE = "StreamConsumer %s does not have LATE_BINDING capabilities, it must be bound in the same tick when it is created. Alternatively, use .withLateBinding() modifier";

    public void setProducer(StreamProducer<T> var1);

    public Stage<Void> getEndOfStream();

    public Set<StreamCapability> getCapabilities();

    default public <R> StreamConsumer<R> with(StreamConsumerModifier<T, R> modifier) {
        StreamConsumer consumer = this;
        return modifier.applyTo(consumer);
    }

    default public StreamConsumer<T> withLateBinding() {
        return this.getCapabilities().contains((Object)StreamCapability.LATE_BINDING) ? this : this.with(StreamLateBinder.create());
    }

    public static <T> StreamConsumer<T> idle() {
        return new StreamConsumers.IdleImpl();
    }

    public static <T> StreamConsumer<T> closingWithError(Throwable exception) {
        return new StreamConsumers.ClosingWithErrorImpl(exception);
    }

    public static <T> StreamConsumer<T> ofConsumer(Consumer<T> consumer) {
        return new StreamConsumers.OfConsumerImpl<T>(consumer);
    }

    public static <T> StreamConsumer<T> ofStage(Stage<StreamConsumer<T>> consumerStage) {
        StreamLateBinder lateBounder = StreamLateBinder.create();
        consumerStage.whenComplete((consumer, throwable) -> {
            if (throwable == null) {
                Preconditions.checkArgument((boolean)consumer.getCapabilities().contains((Object)StreamCapability.LATE_BINDING), (String)LATE_BINDING_ERROR_MESSAGE, (Object[])new Object[]{consumer});
                DataStreams.bind(lateBounder.getOutput(), consumer);
            } else {
                DataStreams.bind(lateBounder.getOutput(), StreamConsumer.closingWithError(throwable));
            }
        });
        return lateBounder.getInput();
    }

    default public <X> StreamConsumerWithResult<T, X> withResult(Stage<X> result) {
        final SettableStage safeEndOfStream = SettableStage.create();
        final SettableStage safeResult = SettableStage.create();
        this.getEndOfStream().whenComplete(($, throwable) -> {
            safeEndOfStream.trySet($, throwable);
            if (throwable != null) {
                safeResult.trySetException(throwable);
            }
        });
        result.post().whenComplete((arg_0, arg_1) -> ((SettableStage)safeResult).trySet(arg_0, arg_1));
        return new StreamConsumerWithResult<T, X>(){

            @Override
            public void setProducer(StreamProducer<T> producer) {
                StreamConsumer.this.setProducer(producer);
            }

            @Override
            public Stage<Void> getEndOfStream() {
                return safeEndOfStream;
            }

            @Override
            public Stage<X> getResult() {
                return safeResult;
            }

            @Override
            public Set<StreamCapability> getCapabilities() {
                return StreamConsumer.this.getCapabilities().contains((Object)StreamCapability.LATE_BINDING) ? EnumSet.of(StreamCapability.LATE_BINDING) : Collections.emptySet();
            }
        };
    }

    default public StreamConsumerWithResult<T, Void> withEndOfStreamAsResult() {
        final SettableStage safeEndOfStream = SettableStage.create();
        this.getEndOfStream().post().whenComplete((arg_0, arg_1) -> ((SettableStage)safeEndOfStream).trySet(arg_0, arg_1));
        return new StreamConsumerWithResult<T, Void>(){

            @Override
            public void setProducer(StreamProducer<T> producer) {
                StreamConsumer.this.setProducer(producer);
            }

            @Override
            public Stage<Void> getEndOfStream() {
                return safeEndOfStream;
            }

            @Override
            public Stage<Void> getResult() {
                return safeEndOfStream;
            }

            @Override
            public Set<StreamCapability> getCapabilities() {
                return StreamConsumer.this.getCapabilities().contains((Object)StreamCapability.LATE_BINDING) ? EnumSet.of(StreamCapability.LATE_BINDING) : Collections.emptySet();
            }
        };
    }
}

