package io.datakernel.datastream;

import io.datakernel.common.Preconditions;
import io.datakernel.common.exception.ExpectedException;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.promise.Promise;
import io.datakernel.promise.SettablePromise;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/datastream/AbstractStreamConsumer.class */
public abstract class AbstractStreamConsumer<T> implements StreamConsumer<T> {
    private StreamSupplier<T> supplier;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    protected final Eventloop eventloop = Eventloop.getCurrentEventloop();
    private final long createTick = this.eventloop.tick();
    private final SettablePromise<Void> endOfStream = new SettablePromise<>();
    private final SettablePromise<Void> acknowledgement = new SettablePromise<>();

    @Override // io.datakernel.datastream.StreamConsumer
    public final void setSupplier(StreamSupplier<T> streamSupplier) {
        Preconditions.checkNotNull(streamSupplier);
        Preconditions.checkState(this.supplier == null, "Supplier has already been set");
        Preconditions.checkState(getCapabilities().contains(StreamCapability.LATE_BINDING) || this.eventloop.tick() == this.createTick, StreamConsumer.LATE_BINDING_ERROR_MESSAGE, new Object[]{this});
        this.supplier = streamSupplier;
        onWired();
        streamSupplier.getEndOfStream().whenComplete(this.endOfStream).whenException(this::close).post().whenResult(r4 -> {
            onEndOfStream().whenException(this::close).post().whenResult(r3 -> {
                acknowledge();
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onWired() {
        this.eventloop.post(this::onStarted);
    }

    protected void onStarted() {
    }

    public boolean isWired() {
        return this.supplier != null;
    }

    public final StreamSupplier<T> getSupplier() {
        return this.supplier;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void acknowledge() {
        if (this.acknowledgement.isComplete()) {
            return;
        }
        this.acknowledgement.set((Object) null);
        this.eventloop.post(this::cleanup);
    }

    protected abstract Promise<Void> onEndOfStream();

    public final void close(@NotNull Throwable th) {
        if (this.acknowledgement.isComplete()) {
            return;
        }
        this.acknowledgement.setException(th);
        if (!(th instanceof ExpectedException) && this.logger.isWarnEnabled()) {
            this.logger.warn("StreamConsumer {} closed with error {}", this, th.toString());
        }
        onError(th);
        this.eventloop.post(this::cleanup);
    }

    protected abstract void onError(Throwable th);

    protected void cleanup() {
    }

    public Promise<Void> getEndOfStream() {
        return this.endOfStream;
    }

    @Override // io.datakernel.datastream.StreamConsumer
    public final Promise<Void> getAcknowledgement() {
        return this.acknowledgement;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Set<StreamCapability> addCapabilities(@Nullable StreamConsumer<?> streamConsumer, StreamCapability streamCapability, StreamCapability... streamCapabilityArr) {
        EnumSet of = EnumSet.of(streamCapability, streamCapabilityArr);
        if (streamConsumer != null) {
            of.addAll(streamConsumer.getCapabilities());
        }
        return of;
    }

    @Override // io.datakernel.datastream.StreamConsumer
    public Set<StreamCapability> getCapabilities() {
        return Collections.emptySet();
    }
}
