package io.pravega.common.util;

import io.pravega.common.Exceptions;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:io/pravega/common/util/AbstractDrainingQueue.class */
public abstract class AbstractDrainingQueue<T> {

    @GuardedBy("lock")
    private CompletableFuture<Queue<T>> pendingTake;

    @GuardedBy("lock")
    private boolean closed;
    private final Object lock = new Object();

    public Queue<T> close() {
        CompletableFuture<Queue<T>> completableFuture = null;
        ArrayDeque arrayDeque = new ArrayDeque();
        synchronized (this.lock) {
            if (!this.closed) {
                this.closed = true;
                completableFuture = this.pendingTake;
                this.pendingTake = null;
                while (true) {
                    int size = size();
                    if (size <= 0) {
                        break;
                    }
                    arrayDeque.addAll(fetch(size));
                }
            }
        }
        if (completableFuture != null) {
            completableFuture.cancel(true);
        }
        return arrayDeque;
    }

    public void cancelPendingTake() {
        CompletableFuture<Queue<T>> completableFuture;
        synchronized (this.lock) {
            completableFuture = this.pendingTake;
            this.pendingTake = null;
        }
        if (completableFuture != null) {
            completableFuture.cancel(true);
        }
    }

    public void add(T t) {
        CompletableFuture<Queue<T>> completableFuture;
        Queue<T> queue = null;
        synchronized (this.lock) {
            Exceptions.checkNotClosed(this.closed, this);
            addInternal(t);
            completableFuture = this.pendingTake;
            this.pendingTake = null;
            if (completableFuture != null) {
                queue = fetch(size());
            }
        }
        if (completableFuture != null) {
            completableFuture.complete(queue);
        }
    }

    public Queue<T> poll(int i) {
        Queue<T> fetch;
        synchronized (this.lock) {
            Exceptions.checkNotClosed(this.closed, this);
            Preconditions.checkState(this.pendingTake == null, "Cannot call poll() when there is a pending take() request.");
            fetch = fetch(i);
        }
        return fetch;
    }

    public CompletableFuture<Queue<T>> take(int i) {
        synchronized (this.lock) {
            Exceptions.checkNotClosed(this.closed, this);
            Preconditions.checkState(this.pendingTake == null, "Cannot have more than one concurrent pending take() request.");
            Queue<T> fetch = fetch(i);
            if (fetch.size() > 0) {
                return CompletableFuture.completedFuture(fetch);
            }
            this.pendingTake = newTakeResult();
            return this.pendingTake;
        }
    }

    public CompletableFuture<Queue<T>> take(int i, Duration duration, ScheduledExecutorService scheduledExecutorService) {
        CompletableFuture<Queue<T>> take = take(i);
        if (!take.isDone()) {
            ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
                synchronized (this.lock) {
                    if (this.pendingTake == take) {
                        this.pendingTake = null;
                        take.completeExceptionally(new TimeoutException());
                    }
                }
            }, duration.toMillis(), TimeUnit.MILLISECONDS);
            take.whenComplete((queue, th) -> {
                schedule.cancel(true);
            });
        }
        return take;
    }

    public T peek() {
        T peekInternal;
        synchronized (this.lock) {
            peekInternal = peekInternal();
        }
        return peekInternal;
    }

    public int size() {
        int sizeInternal;
        synchronized (this.lock) {
            sizeInternal = sizeInternal();
        }
        return sizeInternal;
    }

    protected abstract void addInternal(T t);

    protected abstract int sizeInternal();

    protected abstract T peekInternal();

    protected abstract Queue<T> fetch(int i);

    @VisibleForTesting
    protected CompletableFuture<Queue<T>> newTakeResult() {
        return new CompletableFuture<>();
    }
}
