package alluxio.master.file.activesync;

import alluxio.AlluxioURI;
import alluxio.SyncInfo;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.heartbeat.HeartbeatExecutor;
import alluxio.master.file.FileSystemMaster;
import alluxio.master.file.meta.MountTable;
import alluxio.master.file.meta.options.MountInfo;
import alluxio.resource.CloseableResource;
import alluxio.retry.RetryUtils;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UnderFileSystem;
import alluxio.util.LogUtils;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/master/file/activesync/ActiveSyncer.class */
public class ActiveSyncer implements HeartbeatExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ActiveSyncer.class);
    private final FileSystemMaster mFileSystemMaster;
    private final ActiveSyncManager mSyncManager;
    private final MountTable mMountTable;
    private final long mMountId;
    private final AlluxioURI mMountUri;
    private final Queue<CompletableFuture<?>> mSyncTasks = new LinkedBlockingQueue(32);

    public ActiveSyncer(FileSystemMaster fileSystemMaster, ActiveSyncManager activeSyncManager, MountTable mountTable, long j) {
        this.mFileSystemMaster = fileSystemMaster;
        this.mSyncManager = activeSyncManager;
        this.mMountTable = mountTable;
        this.mMountId = j;
        this.mMountUri = ((MountInfo) Objects.requireNonNull(this.mMountTable.getMountInfo(this.mMountId))).getAlluxioUri();
    }

    public void heartbeat(long j) {
        LOG.debug("start sync heartbeat for {} with mount id {}", this.mMountUri, Long.valueOf(this.mMountId));
        this.mSyncTasks.removeIf((v0) -> {
            return v0.isDone();
        });
        List<AlluxioURI> filterList = this.mSyncManager.getFilterList(this.mMountId);
        if (filterList == null || filterList.isEmpty()) {
            return;
        }
        try {
            CloseableResource acquireUfsResource = ((UfsManager.UfsClient) Objects.requireNonNull(this.mMountTable.getUfsClient(this.mMountId))).acquireUfsResource();
            Throwable th = null;
            try {
                UnderFileSystem underFileSystem = (UnderFileSystem) acquireUfsResource.get();
                if (!underFileSystem.supportsActiveSync()) {
                    if (acquireUfsResource != null) {
                        if (0 == 0) {
                            acquireUfsResource.close();
                            return;
                        }
                        try {
                            acquireUfsResource.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                SyncInfo activeSyncInfo = underFileSystem.getActiveSyncInfo();
                Set<AlluxioURI> syncPoints = activeSyncInfo.getSyncPoints();
                CompletableFuture[] completableFutureArr = new CompletableFuture[syncPoints.size()];
                int i = 0;
                for (AlluxioURI alluxioURI : syncPoints) {
                    completableFutureArr[i] = CompletableFuture.supplyAsync(() -> {
                        processSyncPoint(alluxioURI, activeSyncInfo);
                        return Long.valueOf(activeSyncInfo.getTxId());
                    }, this.mSyncManager.getExecutor());
                    i++;
                }
                CompletableFuture<Void> thenRunAsync = CompletableFuture.allOf(completableFutureArr).thenRunAsync(() -> {
                    this.mFileSystemMaster.recordActiveSyncTxid(activeSyncInfo.getTxId(), this.mMountId);
                }, (Executor) this.mSyncManager.getExecutor());
                int i2 = 0;
                while (!this.mSyncTasks.offer(thenRunAsync) && !Thread.currentThread().isInterrupted()) {
                    for (CompletableFuture<?> completableFuture : this.mSyncTasks) {
                        try {
                            completableFuture.get(Configuration.getMs(PropertyKey.MASTER_UFS_ACTIVE_SYNC_INTERVAL) / this.mSyncTasks.size(), TimeUnit.MILLISECONDS);
                            this.mSyncTasks.remove(completableFuture);
                            break;
                        } catch (InterruptedException | ExecutionException e) {
                            LogUtils.warnWithException(LOG, "Failed while waiting on task to add new task to head of queue", new Object[]{e});
                            if (Thread.currentThread().isInterrupted()) {
                                Thread.currentThread().interrupt();
                            }
                            i2++;
                        } catch (TimeoutException e2) {
                            LOG.trace("sync task did not complete during heartbeat. Attempt: {}", Integer.valueOf(i2));
                            i2++;
                        }
                    }
                }
                if (acquireUfsResource != null) {
                    if (0 != 0) {
                        try {
                            acquireUfsResource.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        acquireUfsResource.close();
                    }
                }
                return;
            } catch (Throwable th4) {
                if (acquireUfsResource != null) {
                    if (0 != 0) {
                        try {
                            acquireUfsResource.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        acquireUfsResource.close();
                    }
                }
                throw th4;
            }
        } catch (IOException e3) {
            LOG.warn("IOException " + Throwables.getStackTraceAsString(e3));
        }
        LOG.warn("IOException " + Throwables.getStackTraceAsString(e3));
    }

    public void close() {
        Iterator<CompletableFuture<?>> it = this.mSyncTasks.iterator();
        while (it.hasNext()) {
            it.next().cancel(true);
        }
    }

    private void processSyncPoint(AlluxioURI alluxioURI, SyncInfo syncInfo) {
        AlluxioURI uri = this.mMountTable.reverseResolve(alluxioURI).getUri();
        if (uri == null) {
            LOG.warn("Unable to reverse resolve ufsUri {}", alluxioURI);
            return;
        }
        try {
            if (syncInfo.isForceSync()) {
                LOG.debug("force full sync {}", alluxioURI);
                RetryUtils.retry("Full Sync", () -> {
                    this.mFileSystemMaster.activeSyncMetadata(uri, null, this.mSyncManager.getExecutor());
                }, this.mSyncManager.getRetryPolicy());
            } else {
                LOG.debug("incremental sync {}", alluxioURI);
                RetryUtils.retry("Incremental Sync", () -> {
                    this.mFileSystemMaster.activeSyncMetadata(uri, (Collection) syncInfo.getChangedFiles(alluxioURI).stream().map(alluxioURI2 -> {
                        return ((MountTable.ReverseResolution) Objects.requireNonNull(this.mMountTable.reverseResolve(alluxioURI2))).getUri();
                    }).collect(Collectors.toSet()), this.mSyncManager.getExecutor());
                }, this.mSyncManager.getRetryPolicy());
            }
        } catch (IOException e) {
            LOG.warn("Failed to submit active sync job to master: ufsUri {}, syncPoint {} ", new Object[]{alluxioURI, uri, e});
        }
    }
}
