/*
 * Decompiled with CFR 0.152.
 */
package org.finos.tracdap.common.concurrent.flow;

import java.util.Iterator;
import java.util.concurrent.Flow;
import java.util.stream.Stream;
import org.finos.tracdap.common.exception.ETracInternal;

public class SourcePublisher<T>
implements Flow.Publisher<T> {
    private final Iterator<T> source;
    private final AutoCloseable closeable;
    private boolean done = false;

    public SourcePublisher(Iterable<T> source) {
        this.source = source.iterator();
        this.closeable = null;
    }

    public SourcePublisher(Stream<T> source) {
        this.source = source.iterator();
        this.closeable = source;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        Subscription subscription = new Subscription(subscriber);
        subscriber.onSubscribe(subscription);
    }

    private class Subscription
    implements Flow.Subscription {
        Flow.Subscriber<? super T> subscriber;

        public Subscription(Flow.Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            try {
                int i = 0;
                while ((long)i < n && SourcePublisher.this.source.hasNext()) {
                    this.subscriber.onNext(SourcePublisher.this.source.next());
                    ++i;
                }
                if (!SourcePublisher.this.source.hasNext() && !SourcePublisher.this.done) {
                    this.subscriber.onComplete();
                    SourcePublisher.this.done = true;
                }
            }
            catch (Exception e) {
                this.subscriber.onError(e);
                SourcePublisher.this.done = true;
            }
        }

        @Override
        public void cancel() {
            if (SourcePublisher.this.closeable != null) {
                try {
                    SourcePublisher.this.closeable.close();
                }
                catch (Exception e) {
                    throw new ETracInternal(e.getMessage(), e);
                }
            }
        }
    }
}

