/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream;

import io.datakernel.annotation.Nullable;
import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.exception.ExpectedException;
import io.datakernel.stream.StreamCapability;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamStatus;
import io.datakernel.util.Preconditions;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractStreamProducer<T>
implements StreamProducer<T> {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    protected final Eventloop eventloop = Eventloop.getCurrentEventloop();
    private final long createTick = this.eventloop.getTick();
    private StreamConsumer<T> consumer;
    private StreamStatus status = StreamStatus.OPEN;
    private Throwable exception;
    private final SettableStage<Void> endOfStream = SettableStage.create();
    private StreamDataReceiver<T> currentDataReceiver;
    private StreamDataReceiver<T> lastDataReceiver;
    private boolean producing;
    private boolean posted;
    private Object tag;

    @Override
    public final void setConsumer(StreamConsumer<T> consumer) {
        Preconditions.checkNotNull(consumer);
        Preconditions.checkState((this.consumer == null ? 1 : 0) != 0);
        Preconditions.checkState((this.getCapabilities().contains((Object)StreamCapability.LATE_BINDING) || this.eventloop.getTick() == this.createTick ? 1 : 0) != 0, (String)"StreamProducer %s does not have LATE_BINDING capabilities, it must be bound in the same tick when it is created. Alternatively, use .withLateBinding() modifier", (Object[])new Object[]{this});
        this.consumer = consumer;
        this.onWired();
        consumer.getEndOfStream().whenException(this::closeWithError);
    }

    protected void onWired() {
        this.eventloop.post(this::onStarted);
    }

    protected void onStarted() {
    }

    public boolean isWired() {
        return this.consumer != null;
    }

    @Nullable
    public final StreamConsumer<T> getConsumer() {
        return this.consumer;
    }

    public final boolean isReceiverReady() {
        return this.currentDataReceiver != null;
    }

    public final void send(T item) {
        this.lastDataReceiver.onData(item);
    }

    @Nullable
    public final StreamDataReceiver<T> getCurrentDataReceiver() {
        return this.currentDataReceiver;
    }

    @Nullable
    public StreamDataReceiver<T> getLastDataReceiver() {
        return this.lastDataReceiver;
    }

    protected void produce() {
    }

    protected void onProduce(StreamDataReceiver<T> dataReceiver) {
        if (this.producing) {
            return;
        }
        if (this.posted) {
            return;
        }
        this.posted = true;
        this.eventloop.post(() -> {
            this.posted = false;
            if (!this.isReceiverReady()) {
                return;
            }
            this.producing = true;
            this.produce();
            this.producing = false;
        });
    }

    @Override
    public final void produce(StreamDataReceiver<T> dataReceiver) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Start producing: {}", (Object)this);
        }
        assert (dataReceiver != null);
        if (this.currentDataReceiver == dataReceiver) {
            return;
        }
        if (this.status.isClosed()) {
            return;
        }
        this.currentDataReceiver = dataReceiver;
        this.lastDataReceiver = dataReceiver;
        this.onProduce(dataReceiver);
    }

    protected void onSuspended() {
    }

    @Override
    public final void suspend() {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Suspend producer: {}", (Object)this);
        }
        if (!this.isReceiverReady()) {
            return;
        }
        this.currentDataReceiver = null;
        this.onSuspended();
    }

    public final void sendEndOfStream() {
        if (this.status.isClosed()) {
            return;
        }
        this.status = StreamStatus.END_OF_STREAM;
        this.currentDataReceiver = null;
        this.lastDataReceiver = item -> {};
        this.eventloop.post(this::cleanup);
        this.endOfStream.set(null);
    }

    public final void closeWithError(Throwable e) {
        if (this.status.isClosed()) {
            return;
        }
        this.status = StreamStatus.CLOSED_WITH_ERROR;
        this.currentDataReceiver = null;
        this.lastDataReceiver = item -> {};
        this.exception = e;
        if (!(e instanceof ExpectedException) && this.logger.isWarnEnabled()) {
            this.logger.warn("StreamProducer {} closed with error {}", (Object)this, (Object)this.exception.toString());
        }
        this.onError(e);
        this.eventloop.post(this::cleanup);
        this.endOfStream.setException(e);
    }

    protected abstract void onError(Throwable var1);

    protected void cleanup() {
    }

    public final StreamStatus getStatus() {
        return this.status;
    }

    public final Throwable getException() {
        return this.exception;
    }

    @Override
    public final Stage<Void> getEndOfStream() {
        return this.endOfStream;
    }

    protected static Set<StreamCapability> addCapabilities(@Nullable StreamProducer<?> producer, StreamCapability capability, StreamCapability ... capabilities) {
        EnumSet<StreamCapability[]> result = EnumSet.of(capability, capabilities);
        if (producer != null) {
            result.addAll(producer.getCapabilities());
        }
        return result;
    }

    @Override
    public Set<StreamCapability> getCapabilities() {
        return Collections.emptySet();
    }

    public final Object getTag() {
        return this.tag;
    }

    public final void setTag(Object tag) {
        this.tag = tag;
    }

    public String toString() {
        return this.tag != null ? this.tag.toString() : super.toString();
    }
}

