package org.opendaylight.yangtools.util.concurrent;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.opendaylight.yangtools.concepts.AbstractSimpleIdentifiable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/opendaylight/yangtools/util/concurrent/AbstractBatchingExecutor.class */
public abstract class AbstractBatchingExecutor<K, T> extends AbstractSimpleIdentifiable<String> {
    private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
    private final ConcurrentMap<K, AbstractBatchingExecutor<K, T>.DispatcherTask> dispatcherTasks;
    private final Executor executor;
    private final int maxQueueCapacity;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractBatchingExecutor.class);
    private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(10);
    private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opendaylight/yangtools/util/concurrent/AbstractBatchingExecutor$DispatcherTask.class */
    public final class DispatcherTask implements Runnable {
        private final K key;
        private boolean exiting;
        private final Lock lock = new ReentrantLock();
        private final Condition notEmpty = this.lock.newCondition();
        private final Condition notFull = this.lock.newCondition();
        private final Queue<T> queue = new ArrayDeque();

        DispatcherTask(K k, Iterator<T> it) {
            this.key = (K) Objects.requireNonNull(k);
            while (it.hasNext()) {
                T next = it.next();
                if (next != null) {
                    this.queue.add(next);
                }
            }
        }

        Iterator<T> recoverItems() {
            return this.queue.iterator();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public K key() {
            return this.key;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int size() {
            this.lock.lock();
            try {
                return this.queue.size();
            } finally {
                this.lock.unlock();
            }
        }

        boolean submitTasks(Iterator<T> it) throws InterruptedException {
            long nanoTime = System.nanoTime() + AbstractBatchingExecutor.GIVE_UP_NANOS;
            this.lock.lock();
            try {
                long nanoTime2 = nanoTime - System.nanoTime();
                while (!this.exiting) {
                    int size = AbstractBatchingExecutor.this.maxQueueCapacity - this.queue.size();
                    if (size > 0) {
                        for (int i = 0; i < size; i++) {
                            if (!it.hasNext()) {
                                this.notEmpty.signal();
                                this.lock.unlock();
                                return true;
                            }
                            this.queue.add(it.next());
                        }
                    } else {
                        if (nanoTime2 <= 0) {
                            AbstractBatchingExecutor.LOG.warn("{}: Failed to offer tasks {} to the queue for worker {}. Exceeded maximum allowable time of {} minutes; the worker is likely in an unrecoverable state (deadlock or endless loop). ", AbstractBatchingExecutor.this.getIdentifier(), ImmutableList.copyOf(it), this.key, 10);
                            this.lock.unlock();
                            return true;
                        }
                        nanoTime2 = this.notFull.awaitNanos(nanoTime2);
                    }
                }
                return false;
            } finally {
                this.lock.unlock();
            }
        }

        private boolean waitForQueue() {
            long j = AbstractBatchingExecutor.TASK_WAIT_NANOS;
            while (this.queue.isEmpty()) {
                if (j <= 0) {
                    return false;
                }
                try {
                    j = this.notEmpty.awaitNanos(j);
                } catch (InterruptedException e) {
                    AbstractBatchingExecutor.LOG.debug("{}: Interrupted trying to remove from {} worker's queue", AbstractBatchingExecutor.this.getIdentifier(), this.key);
                    return false;
                }
            }
            return true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    this.lock.lock();
                    try {
                        if (!waitForQueue()) {
                            this.exiting = true;
                            this.lock.unlock();
                            return;
                        } else {
                            ImmutableList<T> copyOf = ImmutableList.copyOf((Collection) this.queue);
                            this.queue.clear();
                            this.notFull.signalAll();
                            this.lock.unlock();
                            invokeWorker(copyOf);
                        }
                    } catch (Throwable th) {
                        this.lock.unlock();
                        throw th;
                    }
                } finally {
                    AbstractBatchingExecutor.this.dispatcherTasks.remove(this.key, this);
                }
            }
        }

        private void invokeWorker(ImmutableList<T> immutableList) {
            AbstractBatchingExecutor.LOG.debug("{}: Invoking worker {} with tasks: {}", AbstractBatchingExecutor.this.getIdentifier(), this.key, immutableList);
            try {
                AbstractBatchingExecutor.this.executeBatch(this.key, immutableList);
            } catch (Exception e) {
                AbstractBatchingExecutor.LOG.error("{}: Error invoking worker {} with {}", AbstractBatchingExecutor.this.getIdentifier(), this.key, immutableList, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractBatchingExecutor(String str, Executor executor, int i) {
        super(str);
        this.dispatcherTasks = new ConcurrentHashMap();
        this.executor = (Executor) Objects.requireNonNull(executor);
        Preconditions.checkArgument(i > 0, "Invalid maxQueueCapacity %s must be > 0", i);
        this.maxQueueCapacity = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final int maxQueueCapacity() {
        return this.maxQueueCapacity;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final Executor executor() {
        return this.executor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void submitTask(K k, T t) {
        submitTasks(k, Collections.singletonList(Objects.requireNonNull(t)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void submitTasks(K k, Iterable<T> iterable) {
        if (iterable == null || k == null) {
            return;
        }
        LOG.trace("{}: submitTasks for worker {}: {}", getIdentifier(), k, iterable);
        try {
            Iterator<T> it = iterable.iterator();
            while (true) {
                AbstractBatchingExecutor<K, T>.DispatcherTask dispatcherTask = this.dispatcherTasks.get(k);
                if (dispatcherTask == null) {
                    AbstractBatchingExecutor<K, T>.DispatcherTask dispatcherTask2 = new DispatcherTask(k, it);
                    dispatcherTask = this.dispatcherTasks.putIfAbsent(k, dispatcherTask2);
                    if (dispatcherTask == null) {
                        runTask(k, dispatcherTask2);
                        break;
                    }
                    it = dispatcherTask2.recoverItems();
                }
                if (dispatcherTask.submitTasks(it)) {
                    break;
                }
                AbstractBatchingExecutor<K, T>.DispatcherTask dispatcherTask3 = new DispatcherTask(k, it);
                if (this.dispatcherTasks.replace(k, dispatcherTask, dispatcherTask3)) {
                    runTask(k, dispatcherTask3);
                    break;
                } else {
                    it = dispatcherTask3.recoverItems();
                    LOG.debug("{}: retrying task queueing for {}", getIdentifier(), k);
                }
            }
        } catch (InterruptedException e) {
            LOG.warn("{}: Interrupted trying to add to {} worker's queue", getIdentifier(), k);
        }
        LOG.trace("{}: submitTasks done for worker {}", getIdentifier(), k);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final Stream<AbstractBatchingExecutor<K, T>.DispatcherTask> streamTasks() {
        return this.dispatcherTasks.values().stream();
    }

    abstract void executeBatch(K k, ImmutableList<T> immutableList) throws Exception;

    private void runTask(K k, AbstractBatchingExecutor<K, T>.DispatcherTask dispatcherTask) {
        LOG.debug("{}: Submitting DispatcherTask for worker {}", getIdentifier(), k);
        this.executor.execute(dispatcherTask);
    }
}
