/*
 * Decompiled with CFR 0.152.
 */
package org.finos.tracdap.common.concurrent.flow;

import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import org.finos.tracdap.common.exception.EUnexpected;

public class FutureResultPublisher<T>
implements Flow.Publisher<T> {
    private final AtomicReference<ResultState> state = new AtomicReference<ResultState>(new ResultState());
    private Flow.Subscriber<? super T> subscriber;

    public FutureResultPublisher(CompletionStage<T> source) {
        source.whenComplete(this::acceptResult);
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        this.subscriber = subscriber;
        Subscription subscription = new Subscription();
        subscriber.onSubscribe(subscription);
    }

    private void acceptResult(T result, Throwable error) {
        ResultState prior = this.state.getAndUpdate(s1 -> {
            ResultState s2 = s1.clone();
            s2.result = result;
            s2.error = error;
            return s2;
        });
        if (prior.requested && !prior.cancelled) {
            if (error == null) {
                this.subscriber.onNext(result);
                this.subscriber.onComplete();
            } else {
                Throwable completionError = error instanceof CompletionException ? error : new CompletionException(error.getMessage(), error);
                this.subscriber.onError(completionError);
            }
        }
    }

    private class ResultState
    implements Cloneable {
        boolean requested;
        boolean cancelled;
        T result;
        Throwable error;

        private ResultState() {
        }

        public ResultState clone() {
            try {
                return (ResultState)super.clone();
            }
            catch (CloneNotSupportedException ex) {
                throw new EUnexpected();
            }
        }
    }

    private class Subscription
    implements Flow.Subscription {
        private Subscription() {
        }

        @Override
        public void request(long n) {
            ResultState priorState = FutureResultPublisher.this.state.getAndUpdate(s1 -> {
                ResultState s2 = s1.clone();
                s2.requested = true;
                return s2;
            });
            if (!priorState.requested && !priorState.cancelled) {
                if (priorState.result != null) {
                    FutureResultPublisher.this.subscriber.onNext(priorState.result);
                    FutureResultPublisher.this.subscriber.onComplete();
                }
                if (priorState.error != null) {
                    FutureResultPublisher.this.subscriber.onError(priorState.error);
                }
            }
        }

        @Override
        public void cancel() {
            FutureResultPublisher.this.state.getAndUpdate(s1 -> {
                ResultState s2 = s1.clone();
                s2.cancelled = true;
                return s2;
            });
        }
    }
}

