package ch.squaredesk.nova.comm;

import ch.squaredesk.nova.tuples.Pair;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:ch/squaredesk/nova/comm/BackpressuredStreamFromAsyncSource.class */
public class BackpressuredStreamFromAsyncSource<MessageType> {
    private static final int DEFAULT_MESSAGE_BUFFER_SIZE = 1;
    private final BlockingQueue<MessageType> queue;
    private final Runnable closeAction;
    private final AtomicBoolean shutdown;

    public BackpressuredStreamFromAsyncSource() {
        this(DEFAULT_MESSAGE_BUFFER_SIZE, null);
    }

    public BackpressuredStreamFromAsyncSource(Runnable runnable) {
        this(DEFAULT_MESSAGE_BUFFER_SIZE, runnable);
    }

    private BackpressuredStreamFromAsyncSource(int i, Runnable runnable) {
        this.shutdown = new AtomicBoolean(false);
        this.queue = new DisruptorBlockingQueue(i);
        this.closeAction = runnable;
    }

    public void onNext(MessageType messagetype) {
        if (this.shutdown.get()) {
            throw new IllegalStateException("Stream closed");
        }
        try {
            this.queue.put(messagetype);
        } catch (InterruptedException e) {
        }
    }

    public void onComplete() {
        this.shutdown.set(true);
    }

    public Flowable<MessageType> toFlowable() {
        return Flowable.generate(() -> {
            return new Pair(this.queue, this.shutdown);
        }, (pair, emitter) -> {
            Object obj = null;
            while (!((AtomicBoolean) pair._2).get() && obj == null) {
                try {
                    obj = ((BlockingQueue) pair._1).poll(100L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    emitter.onComplete();
                    return;
                }
            }
            if (obj != null) {
                emitter.onNext(obj);
            } else {
                emitter.onComplete();
            }
        }, pair2 -> {
            if (this.closeAction != null) {
                this.closeAction.run();
            }
        }).subscribeOn(Schedulers.io());
    }
}
