package io.vlingo.common.completes.sources;

import io.vlingo.common.Completes;
import io.vlingo.common.completes.LazySource;
import io.vlingo.common.completes.Sink;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/* loaded from: input_file:io/vlingo/common/completes/sources/InMemorySource.class */
public class InMemorySource<Exposes> implements LazySource<Exposes> {
    private Queue<Consumer<Sink<Exposes>>> queue = new ArrayDeque();
    private Sink<Exposes> subscriber = null;
    private AtomicBoolean active = new AtomicBoolean(false);

    @Override // io.vlingo.common.completes.Source
    public void emitOutcome(Exposes exposes) {
        if (this.active.get()) {
            this.subscriber.onOutcome(exposes);
        } else {
            this.queue.add(sink -> {
                sink.onOutcome(exposes);
            });
        }
    }

    @Override // io.vlingo.common.completes.Source
    public void emitError(Exception exc) {
        if (this.active.get()) {
            this.subscriber.onError(exc);
        } else {
            this.queue.add(sink -> {
                sink.onError(exc);
            });
        }
    }

    @Override // io.vlingo.common.completes.Source
    public void emitCompletion() {
        if (this.active.get()) {
            this.subscriber.onCompletion();
        } else {
            this.queue.add((v0) -> {
                v0.onCompletion();
            });
        }
    }

    @Override // io.vlingo.common.completes.Source
    public void subscribe(Sink<Exposes> sink) {
        this.subscriber = sink;
    }

    @Override // io.vlingo.common.completes.LazySource, io.vlingo.common.completes.Source
    public void activate() {
        if (this.subscriber == null) {
            throw new UnsupportedOperationException("Source must have a subscriber before being able to activate it.");
        }
        if (!this.active.compareAndSet(false, true) || this.queue.isEmpty()) {
            return;
        }
        this.queue.forEach(consumer -> {
            consumer.accept(this.subscriber);
        });
        this.queue = null;
    }

    public static <E> InMemorySource<E> fromCompletes(Completes<E> completes) {
        InMemorySource<E> inMemorySource = new InMemorySource<>();
        completes.andFinallyConsume(obj -> {
            inMemorySource.emitOutcome(obj);
            inMemorySource.emitCompletion();
        });
        return inMemorySource;
    }

    public String toString() {
        return "InMemorySource{queue=" + this.queue + ", subscriber=" + this.subscriber + ", active=" + this.active + '}';
    }
}
