package org.iworkz.genesis.vertx.common.stream;

import io.vertx.core.streams.ReadStream;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;

/* loaded from: input_file:org/iworkz/genesis/vertx/common/stream/CompositeReadStream.class */
public class CompositeReadStream<T> extends AbstractAsyncReadStream<T> {
    private static final int DEFAULT_QUEUE_SIZE = 20;
    private final int queueSize;
    private long fetchAmount;
    private Queue<UUID> subscribedStreamIds;
    private boolean membersPaused;
    private Queue<AsyncReadStream<T>> pendingStreams;
    private Queue<AsyncReadStream<T>> queuedStreams;

    public CompositeReadStream() {
        this(DEFAULT_QUEUE_SIZE);
    }

    public CompositeReadStream(int i) {
        this.fetchAmount = Long.MAX_VALUE;
        this.subscribedStreamIds = new LinkedList();
        this.membersPaused = true;
        this.pendingStreams = new LinkedList();
        this.queuedStreams = new LinkedList();
        this.queueSize = i;
    }

    public AsyncReadStream<T> addMemberStream(AsyncReadStream<T> asyncReadStream) {
        return addMemberStream(subscribeMemberStream(), asyncReadStream);
    }

    public AsyncReadStream<T> addMemberStream(UUID uuid, AsyncReadStream<T> asyncReadStream) {
        asyncReadStream.available().map(asyncReadStream2 -> {
            boolean z = false;
            synchronized (this.queuedStreams) {
                if (this.queuedStreams.size() < this.queueSize) {
                    this.queuedStreams.add(asyncReadStream);
                    z = true;
                    resumeMemberStream(asyncReadStream);
                }
            }
            synchronized (this.pendingStreams) {
                if (!z) {
                    this.pendingStreams.add(asyncReadStream);
                    asyncReadStream.pause();
                }
                this.subscribedStreamIds.remove(uuid);
            }
            this.readyPromise.tryComplete(this);
            return asyncReadStream2;
        }).onFailure(this::fail);
        asyncReadStream.endHandler(r5 -> {
            AsyncReadStream<T> asyncReadStream3 = null;
            synchronized (this.pendingStreams) {
                if (!this.pendingStreams.isEmpty()) {
                    asyncReadStream3 = this.pendingStreams.remove();
                }
            }
            synchronized (this.queuedStreams) {
                this.queuedStreams.remove(asyncReadStream);
                if (asyncReadStream3 != null) {
                    this.queuedStreams.add(asyncReadStream3);
                    resumeMemberStream(asyncReadStream3);
                }
                if (this.subscribedStreamIds.isEmpty() && allEnded()) {
                    handleEnd(null);
                }
            }
        });
        asyncReadStream.exceptionHandler(this::handleException);
        return this;
    }

    protected void resumeMemberStream(AsyncReadStream<T> asyncReadStream) {
        if (this.membersPaused) {
            return;
        }
        asyncReadStream.handler(this::invokeHandleItem);
        asyncReadStream.resume();
    }

    public UUID subscribeMemberStream() {
        UUID randomUUID = UUID.randomUUID();
        synchronized (this.pendingStreams) {
            this.subscribedStreamIds.add(randomUUID);
        }
        return randomUUID;
    }

    protected boolean allEnded() {
        return this.queuedStreams.isEmpty();
    }

    public ReadStream<T> pause() {
        synchronized (this.queuedStreams) {
            this.membersPaused = true;
            Iterator<AsyncReadStream<T>> it = this.queuedStreams.iterator();
            while (it.hasNext()) {
                it.next().pause();
            }
        }
        return this;
    }

    @Override // org.iworkz.genesis.vertx.common.stream.AbstractAsyncReadStream
    /* renamed from: resume */
    public CompositeReadStream<T> mo14resume() {
        synchronized (this.queuedStreams) {
            super.mo14resume();
            this.membersPaused = false;
            for (AsyncReadStream<T> asyncReadStream : this.queuedStreams) {
                asyncReadStream.handler(this::invokeHandleItem);
                asyncReadStream.resume();
            }
        }
        return this;
    }

    public ReadStream<T> fetch(long j) {
        synchronized (this.queuedStreams) {
            if (j > 0) {
                this.fetchAmount = j;
            }
            Iterator<AsyncReadStream<T>> it = this.queuedStreams.iterator();
            while (it.hasNext()) {
                it.next().fetch(j);
            }
        }
        return this;
    }

    protected synchronized void invokeHandleItem(T t) {
        try {
            this.fetchAmount--;
            handleSourceItem(t);
        } catch (Exception e) {
            handleException(e);
        }
    }
}
