package alluxio.master.file.mdsync;

import alluxio.collections.ConcurrentHashSet;
import alluxio.exception.runtime.InternalRuntimeException;
import alluxio.master.file.mdsync.DefaultSyncProcess;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.CloseableResource;
import alluxio.underfs.UfsClient;
import alluxio.underfs.UfsLoadResult;
import alluxio.util.logging.SamplingLogger;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:alluxio/master/file/mdsync/LoadRequestExecutor.class */
public class LoadRequestExecutor implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(LoadRequestExecutor.class);
    private static final Logger SAMPLING_LOG = new SamplingLogger(LOG, 5000);
    private final AtomicInteger mRemainingTickets;
    private final int mMaxRunning;
    private final LoadResultExecutor mResultExecutor;
    private final Map<Long, PathLoaderTask> mPathLoaderTasks = new ConcurrentHashMap();
    private final Set<Long> mPathLoaderTasksWithPendingLoads = new ConcurrentHashSet();
    private final ConcurrentLinkedDeque<Long> mPathLoaderTaskQueue = new ConcurrentLinkedDeque<>();
    private final BlockingQueue<LoadRequest> mLoadRequests = new LinkedBlockingQueue();
    private final PriorityQueue<RateLimitedRequest> mRateLimited = new PriorityQueue<>();
    private final Thread mExecutor = new Thread(() -> {
        while (!Thread.interrupted()) {
            try {
                runNextLoadTask();
            } catch (InterruptedException e) {
                return;
            }
        }
        LOG.info("Load request runner thread exiting");
    }, "LoadRequestRunner");

    /* JADX INFO: Access modifiers changed from: package-private */
    public LoadRequestExecutor(int i, LoadResultExecutor loadResultExecutor) {
        this.mMaxRunning = i;
        this.mRemainingTickets = new AtomicInteger(i);
        this.mResultExecutor = loadResultExecutor;
        this.mExecutor.start();
        registerMetrics();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addPathLoaderTask(PathLoaderTask pathLoaderTask) {
        long id = pathLoaderTask.getTaskInfo().getId();
        pathLoaderTask.runOnPendingLoad(() -> {
            hasNewLoadTask(id);
        });
        this.mPathLoaderTasks.put(Long.valueOf(id), pathLoaderTask);
        this.mPathLoaderTaskQueue.add(Long.valueOf(id));
        this.mPathLoaderTasksWithPendingLoads.add(Long.valueOf(id));
        notifyAll();
    }

    synchronized void hasNewLoadTask(long j) {
        if (this.mPathLoaderTasksWithPendingLoads.contains(Long.valueOf(j))) {
            return;
        }
        this.mPathLoaderTaskQueue.add(Long.valueOf(j));
        this.mPathLoaderTasksWithPendingLoads.add(Long.valueOf(j));
        notifyAll();
    }

    private void onLoadError(LoadRequest loadRequest, Throwable th) {
        if (th instanceof DefaultSyncProcess.MountPointNotFoundRuntimeException) {
            loadRequest.getTaskInfo().getStats().reportSyncFailReason(loadRequest, null, SyncFailReason.LOADING_MOUNT_POINT_DOES_NOT_EXIST, th);
        } else {
            loadRequest.getTaskInfo().getStats().reportSyncFailReason(loadRequest, null, SyncFailReason.LOADING_UFS_IO_FAILURE, th);
        }
        releaseRunning();
        loadRequest.onError(th);
    }

    private void processLoadResult(LoadRequest loadRequest, UfsLoadResult ufsLoadResult) {
        Optional<LoadResult> onReceiveLoadRequestOutput = loadRequest.getTaskInfo().getMdSync().onReceiveLoadRequestOutput(loadRequest.getBaseTaskId(), loadRequest.getLoadRequestId(), ufsLoadResult);
        synchronized (this) {
            if (this.mPathLoaderTasks.get(Long.valueOf(loadRequest.getBaseTaskId())) == null || !onReceiveLoadRequestOutput.isPresent()) {
                releaseRunning();
                if (onReceiveLoadRequestOutput.isPresent()) {
                    LOG.debug("Got a load result for id {} with no correspondingpath loader task", Long.valueOf(loadRequest.getBaseTaskId()));
                }
            } else {
                LoadResult loadResult = onReceiveLoadRequestOutput.get();
                LoadResultExecutor loadResultExecutor = this.mResultExecutor;
                Runnable runnable = () -> {
                    releaseRunning();
                    loadResult.getTaskInfo().getStats().mProcessStarted.incrementAndGet();
                };
                Consumer<SyncProcessResult> consumer = syncProcessResult -> {
                    loadResult.getTaskInfo().getStats().mProcessCompleted.incrementAndGet();
                    loadResult.onProcessComplete(syncProcessResult);
                };
                loadResult.getClass();
                loadResultExecutor.processLoadResult(loadResult, runnable, consumer, loadResult::onProcessError);
            }
        }
    }

    private void runNextLoadTask() throws InterruptedException {
        synchronized (this) {
            while (true) {
                if ((!this.mLoadRequests.isEmpty() && this.mRemainingTickets.get() != 0) || (!this.mRateLimited.isEmpty() && this.mRateLimited.peek().isReady())) {
                    break;
                }
                if (this.mRemainingTickets.get() <= 0 || this.mPathLoaderTaskQueue.isEmpty()) {
                    long j = 0;
                    if (!this.mRateLimited.isEmpty()) {
                        j = this.mRateLimited.peek().getWaitTime();
                        if (j <= 0) {
                            break;
                        }
                    }
                    if (j == 0) {
                        wait();
                    } else if (j >= 1000000) {
                        TimeUnit.NANOSECONDS.timedWait(this, j);
                    }
                } else {
                    Long poll = this.mPathLoaderTaskQueue.poll();
                    if (poll != null) {
                        checkNextLoad(poll.longValue());
                    }
                }
            }
        }
        SAMPLING_LOG.info("Concurrent running ufs load tasks {}, tasks with pending load requests {}, rate limited pending requests {}", new Object[]{Integer.valueOf(this.mMaxRunning - this.mRemainingTickets.get()), Integer.valueOf(this.mPathLoaderTasks.size()), Integer.valueOf(this.mRateLimited.size())});
        if (!this.mRateLimited.isEmpty() && this.mRateLimited.peek().isReady()) {
            RateLimitedRequest remove = this.mRateLimited.remove();
            runTask(remove.mTask, remove.mLoadRequest);
            return;
        }
        LoadRequest take = this.mLoadRequests.take();
        PathLoaderTask pathLoaderTask = this.mPathLoaderTasks.get(Long.valueOf(take.getBaseTaskId()));
        if (pathLoaderTask == null) {
            LOG.debug("Got load request {} with task id {} with no corresponding task", Long.valueOf(take.getLoadRequestId()), Long.valueOf(take.getLoadRequestId()));
            return;
        }
        Preconditions.checkState(this.mRemainingTickets.decrementAndGet() >= 0);
        Optional acquire = pathLoaderTask.getRateLimiter().acquire();
        if (acquire.isPresent()) {
            this.mRateLimited.add(new RateLimitedRequest(pathLoaderTask, take, ((Long) acquire.get()).longValue()));
        } else {
            runTask(pathLoaderTask, take);
        }
    }

    private synchronized void releaseRunning() {
        this.mRemainingTickets.incrementAndGet();
        notifyAll();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void onTaskComplete(long j) {
        this.mPathLoaderTasks.remove(Long.valueOf(j));
    }

    private void runTask(PathLoaderTask pathLoaderTask, LoadRequest loadRequest) {
        try {
            CloseableResource<UfsClient> client = pathLoaderTask.getClient();
            Throwable th = null;
            try {
                try {
                    String str = null;
                    if (loadRequest.isFirstLoad()) {
                        str = loadRequest.getTaskInfo().getStartAfter();
                    }
                    ((UfsClient) client.get()).performListingAsync(loadRequest.getLoadPath().getPath(), loadRequest.getContinuationToken(), str, loadRequest.getDescendantType(), loadRequest.isFirstLoad(), ufsLoadResult -> {
                        processLoadResult(loadRequest, ufsLoadResult);
                    }, th2 -> {
                        onLoadError(loadRequest, th2);
                    });
                    if (client != null) {
                        if (0 != 0) {
                            try {
                                client.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            client.close();
                        }
                    }
                } catch (Throwable th4) {
                    th = th4;
                    throw th4;
                }
            } finally {
            }
        } catch (Throwable th5) {
            onLoadError(loadRequest, th5);
        }
    }

    private void checkNextLoad(long j) {
        PathLoaderTask pathLoaderTask = this.mPathLoaderTasks.get(Long.valueOf(j));
        if (pathLoaderTask == null || pathLoaderTask.isComplete()) {
            this.mPathLoaderTasks.remove(Long.valueOf(j));
            this.mPathLoaderTasksWithPendingLoads.remove(Long.valueOf(j));
            return;
        }
        Optional<LoadRequest> next = pathLoaderTask.getNext();
        if (!next.isPresent()) {
            this.mPathLoaderTasksWithPendingLoads.remove(Long.valueOf(j));
            return;
        }
        try {
            this.mLoadRequests.put(next.get());
            this.mPathLoaderTaskQueue.addLast(Long.valueOf(j));
        } catch (InterruptedException e) {
            throw new InternalRuntimeException("Not expected to block here", e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.mExecutor.interrupt();
        try {
            this.mExecutor.join(5000L);
        } catch (InterruptedException e) {
            LOG.debug("Interrupted while waiting for load request runner to terminate");
        }
        this.mResultExecutor.close();
    }

    private void registerMetrics() {
        MetricsSystem.registerGaugeIfAbsent(MetricsSystem.getMetricName(MetricKey.MASTER_METADATA_SYNC_QUEUED_LOADS.getName()), () -> {
            Integer valueOf;
            synchronized (this) {
                int i = 0;
                Iterator<PathLoaderTask> it = this.mPathLoaderTasks.values().iterator();
                while (it.hasNext()) {
                    i += it.next().getPendingLoadCount();
                }
                valueOf = Integer.valueOf(i);
            }
            return valueOf;
        });
        MetricsSystem.registerGaugeIfAbsent(MetricsSystem.getMetricName(MetricKey.MASTER_METADATA_SYNC_RUNNING_LOADS.getName()), () -> {
            return Integer.valueOf(this.mMaxRunning - this.mRemainingTickets.get());
        });
    }
}
