/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream;

import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.StreamCapability;
import io.datakernel.stream.StreamConsumerWithResult;
import java.util.EnumSet;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collector;

public final class StreamConsumerToCollector<T, A, R>
extends AbstractStreamConsumer<T>
implements StreamConsumerWithResult<T, R> {
    private final Collector<T, A, R> collector;
    private final SettableStage<R> resultStage = SettableStage.create();
    private A accumulator;

    public StreamConsumerToCollector(Collector<T, A, R> collector) {
        this.collector = collector;
    }

    @Override
    protected void onStarted() {
        this.accumulator = this.collector.supplier().get();
        BiConsumer<A, T> consumer = this.collector.accumulator();
        this.getProducer().produce(item -> consumer.accept(this.accumulator, item));
    }

    @Override
    protected void onEndOfStream() {
        R result = this.collector.finisher().apply(this.accumulator);
        this.accumulator = null;
        this.resultStage.set(result);
    }

    @Override
    protected void onError(Throwable t) {
        this.resultStage.setException(t);
    }

    @Override
    public Stage<R> getResult() {
        return this.resultStage;
    }

    public A getAccumulator() {
        return this.accumulator;
    }

    @Override
    public Set<StreamCapability> getCapabilities() {
        return EnumSet.of(StreamCapability.LATE_BINDING);
    }
}

