package org.onosproject.store.consistent.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.onlab.util.SharedExecutors;
import org.onosproject.store.consistent.impl.MeteringAgent;
import org.onosproject.store.consistent.impl.StateMachineUpdate;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;

/* loaded from: input_file:org/onosproject/store/consistent/impl/DefaultDistributedQueue.class */
public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
    private final String name;
    private final Database database;
    private final Serializer serializer;
    private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
    private static final String PRIMITIVE_NAME = "distributedQueue";
    private static final String SIZE = "size";
    private static final String PUSH = "push";
    private static final String POP = "pop";
    private static final String PEEK = "peek";
    private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
    private final MeteringAgent monitor;

    public DefaultDistributedQueue(String str, Database database, Serializer serializer, boolean z) {
        this.name = (String) Preconditions.checkNotNull(str, "queue name cannot be null");
        this.database = (Database) Preconditions.checkNotNull(database, "database cannot be null");
        this.serializer = (Serializer) Preconditions.checkNotNull(serializer, "serializer cannot be null");
        this.monitor = new MeteringAgent(PRIMITIVE_NAME, str, z);
        this.database.registerConsumer(stateMachineUpdate -> {
            SharedExecutors.getSingleThreadExecutor().execute(() -> {
                if (stateMachineUpdate.target() == StateMachineUpdate.Target.QUEUE_PUSH && ((String) ((List) stateMachineUpdate.input()).get(0)).equals(str)) {
                    tryPoll();
                }
            });
        });
    }

    public long size() {
        MeteringAgent.Context startTimer = this.monitor.startTimer(SIZE);
        return ((Long) Futures.getUnchecked(this.database.queueSize(this.name).whenComplete((l, th) -> {
            startTimer.stop(th);
        }))).longValue();
    }

    public void push(E e) {
        Preconditions.checkNotNull(e, ERROR_NULL_ENTRY);
        MeteringAgent.Context startTimer = this.monitor.startTimer(PUSH);
        Futures.getUnchecked(this.database.queuePush(this.name, this.serializer.encode(e)).whenComplete((r4, th) -> {
            startTimer.stop(th);
        }));
    }

    public CompletableFuture<E> pop() {
        MeteringAgent.Context startTimer = this.monitor.startTimer(POP);
        return (CompletableFuture<E>) this.database.queuePop(this.name).whenComplete((bArr, th) -> {
            startTimer.stop(th);
        }).thenCompose(bArr2 -> {
            if (bArr2 != null) {
                return CompletableFuture.completedFuture(this.serializer.decode(bArr2));
            }
            CompletableFuture<E> completableFuture = new CompletableFuture<>();
            this.pendingFutures.add(completableFuture);
            return completableFuture;
        });
    }

    public E peek() {
        MeteringAgent.Context startTimer = this.monitor.startTimer(PEEK);
        return (E) Futures.getUnchecked(this.database.queuePeek(this.name).thenApply(bArr -> {
            if (bArr != null) {
                return this.serializer.decode(bArr);
            }
            return null;
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (obj, th) -> {
            startTimer.stop(th);
        }));
    }

    public String name() {
        return this.name;
    }

    protected void tryPoll() {
        HashSet newHashSet = Sets.newHashSet();
        for (CompletableFuture completableFuture : this.pendingFutures) {
            Object unchecked = Futures.getUnchecked(this.database.queuePop(this.name).thenApply(bArr -> {
                if (bArr != null) {
                    return this.serializer.decode(bArr);
                }
                return null;
            }));
            if (unchecked == null) {
                break;
            }
            completableFuture.complete(unchecked);
            newHashSet.add(completableFuture);
        }
        this.pendingFutures.removeAll(newHashSet);
    }
}
