package org.apache.hadoop.hdfs.server.hightidenode;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.PolicyInfo;
import org.apache.hadoop.hdfs.protocol.ProtocolCompatible;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorDescriptor;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.StringUtils;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/hightidenode/FileFixer.class */
public class FileFixer implements Runnable {
    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.hdfs.hightide.FileFixer");
    private final Configuration conf;
    private int blockFixInterval;
    private int numThreads;
    private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
    private Collection<PolicyInfo> all;
    private PendingReplication filesBeingFixed;
    ThreadPoolExecutor executor;
    private volatile boolean running = true;
    List<PathToPolicy> pathToPolicy = new LinkedList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/hightidenode/FileFixer$PathToPolicy.class */
    public static class PathToPolicy {
        String spath;
        PolicyInfo pinfo;

        PathToPolicy(Path path, PolicyInfo policyInfo) {
            this.spath = path.toString();
            this.pinfo = policyInfo;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/hightidenode/FileFixer$WorkItem.class */
    public static class WorkItem implements Runnable {
        Path badfile;
        LocatedBlock goodBlock;
        LocatedBlock badBlock;
        DistributedFileSystem destFs;
        Configuration conf;
        private static Random rand = new Random();

        WorkItem(Path path, LocatedBlock locatedBlock, LocatedBlock locatedBlock2, FileSystem fileSystem, Configuration configuration) {
            this.goodBlock = locatedBlock;
            this.badBlock = locatedBlock2;
            this.badfile = path;
            this.destFs = (DistributedFileSystem) fileSystem;
            this.conf = configuration;
        }

        @Override // java.lang.Runnable
        public void run() {
            String str = NodeBase.ROOT;
            try {
                DatanodeInfo[] datanodeReport = this.destFs.getClient().datanodeReport(FSConstants.DatanodeReportType.LIVE);
                DatanodeInfo datanodeInfo = datanodeReport[rand.nextInt(datanodeReport.length)];
                DatanodeInfo datanodeInfo2 = this.goodBlock.getLocations()[rand.nextInt(this.goodBlock.getLocations().length)];
                str = "File " + this.badfile + ": Copying block " + this.goodBlock.getBlock().getBlockName() + " from " + datanodeInfo2.getName() + " to block " + this.badBlock.getBlock().getBlockName() + " on " + datanodeInfo.getName();
                FileFixer.LOG.info(str);
                ClientDatanodeProtocol createClientDatanodeProtocolProxy = FileFixer.createClientDatanodeProtocolProxy(datanodeInfo2, this.conf);
                createClientDatanodeProtocolProxy.copyBlock(this.goodBlock.getBlock(), this.badBlock.getBlock(), datanodeInfo);
                RPC.stopProxy(createClientDatanodeProtocolProxy);
                HighTideNode.getMetrics().fixSuccessfullyStarted.inc();
            } catch (Throwable th) {
                HighTideNode.getMetrics().fixFailedDatanodeError.inc();
                FileFixer.LOG.error(StringUtils.stringifyException(th) + str + ". Failed to contact datanode.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FileFixer(Configuration configuration) throws IOException {
        this.blockFixInterval = 60000;
        this.numThreads = 100;
        this.conf = configuration;
        this.blockFixInterval = configuration.getInt("hightide.blockfix.interval", this.blockFixInterval);
        this.numThreads = configuration.getInt("hightide.blockfix.numthreads", this.numThreads);
        this.executor = new ThreadPoolExecutor(this.numThreads, this.numThreads, THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue());
        this.filesBeingFixed = new PendingReplication(configuration.getInt("dfs.hightide.pending.timeout.sec", -1) * 1000);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setPolicyInfo(Collection<PolicyInfo> collection) throws IOException {
        this.all = collection;
        this.pathToPolicy.clear();
        for (PolicyInfo policyInfo : collection) {
            this.pathToPolicy.add(new PathToPolicy(policyInfo.getSrcPath(), policyInfo));
            Iterator<PolicyInfo.PathInfo> it = policyInfo.getDestPaths().iterator();
            while (it.hasNext()) {
                this.pathToPolicy.add(new PathToPolicy(it.next().rpath, policyInfo));
            }
        }
        Collections.sort(this.pathToPolicy, new Comparator<PathToPolicy>() { // from class: org.apache.hadoop.hdfs.server.hightidenode.FileFixer.1
            @Override // java.util.Comparator
            public int compare(PathToPolicy pathToPolicy, PathToPolicy pathToPolicy2) {
                return 0 - pathToPolicy.spath.compareTo(pathToPolicy2.spath);
            }
        });
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                LOG.info("FileFixer continuing to run...");
                doFindFiles();
            } catch (Error e) {
                LOG.error("Exiting after encountering " + StringUtils.stringifyException(e));
                shutdown();
                throw e;
            } catch (Exception e2) {
                LOG.error(StringUtils.stringifyException(e2));
            }
            try {
                Thread.sleep(this.blockFixInterval);
            } catch (InterruptedException e3) {
                LOG.error("Encountering InturruptedException " + StringUtils.stringifyException(e3));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        this.running = false;
        this.filesBeingFixed.stop();
    }

    static FileSystem getFs(Configuration configuration, Path path) {
        try {
            return path.getFileSystem(configuration);
        } catch (Exception e) {
            LOG.warn("getFs: Unable to contact filesystem: " + path + " ignoring.... " + e);
            e.printStackTrace();
            return null;
        }
    }

    private void doFindFiles() throws IOException {
        HashSet<FileSystem> hashSet = new HashSet();
        HashSet<Path> hashSet2 = new HashSet();
        for (PolicyInfo policyInfo : this.all) {
            FileSystem fs = getFs(policyInfo.getConf(), policyInfo.getSrcPath());
            if (fs != null) {
                hashSet.add(fs);
            }
            Iterator<PolicyInfo.PathInfo> it = policyInfo.getDestPaths().iterator();
            while (it.hasNext()) {
                FileSystem fs2 = getFs(policyInfo.getConf(), it.next().rpath);
                if (fs2 != null) {
                    hashSet.add(fs2);
                }
            }
        }
        for (FileSystem fileSystem : hashSet) {
            if (!this.running) {
                break;
            }
            for (Path path : getCorruptFilesFromNamenode(fileSystem)) {
                if (this.filesBeingFixed.add(path)) {
                    hashSet2.add(path);
                }
            }
        }
        if (!hashSet2.isEmpty()) {
            LOG.info("Found " + hashSet2.size() + " corrupt files.");
        }
        for (Path path2 : hashSet2) {
            if (!this.running) {
                return;
            }
            try {
                fixFile(path2);
            } catch (IOException e) {
                LOG.error("Error while processing " + path2 + ": " + StringUtils.stringifyException(e));
            }
        }
    }

    private void fixFile(Path path) throws IOException {
        PolicyInfo policyInfo = null;
        String path2 = path.toString();
        LOG.info("File  = file to fix:" + path);
        Iterator<PathToPolicy> it = this.pathToPolicy.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            PathToPolicy next = it.next();
            if (path2.startsWith(next.spath)) {
                policyInfo = next.pinfo;
                break;
            }
        }
        if (policyInfo == null) {
            throw new IOException("Unable to find matching policy for " + path);
        }
        HighTideNode.getMetrics().fixAttempt.inc();
        Path path3 = path2.startsWith(policyInfo.getSrcPath().toString()) ? new Path(policyInfo.getDestPaths().get(0).rpath.toString() + path2.split(policyInfo.getSrcPath().toString())[1]) : new Path(policyInfo.getSrcPath().toString() + path2.split(policyInfo.getDestPaths().get(0).rpath.toString())[1]);
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) path3.getFileSystem(policyInfo.getConf());
        DistributedFileSystem distributedFileSystem2 = (DistributedFileSystem) path.getFileSystem(policyInfo.getConf());
        FileStatus fileStatus = distributedFileSystem.getFileStatus(path3);
        FileStatus fileStatus2 = distributedFileSystem2.getFileStatus(path);
        if (fileStatus.getModificationTime() != fileStatus2.getModificationTime()) {
            String str = "Unable to fix file " + path + " because src " + path3 + " has modification time as " + HighTideNode.dateForm.format(new Date(fileStatus.getModificationTime())) + " but destination " + path + " has modification time as " + HighTideNode.dateForm.format(new Date(fileStatus2.getModificationTime()));
            LOG.error(str);
            HighTideNode.getMetrics().fixFailedModTimeMismatch.inc();
            throw new IOException(str);
        }
        if (fileStatus.getBlockSize() != fileStatus2.getBlockSize()) {
            String str2 = "Unable to fix file " + path + " because src " + path3 + " has blocksize as " + fileStatus.getBlockSize() + " but destination " + path + " has blocksize as " + fileStatus2.getBlockSize();
            LOG.error(str2);
            HighTideNode.getMetrics().fixFailedBlockSizeMismatch.inc();
            throw new IOException(str2);
        }
        if (fileStatus.getLen() != fileStatus2.getLen()) {
            String str3 = "Unable to fix file " + path + " because src " + path3 + " has size as " + fileStatus.getLen() + " but destination " + path + " has size as " + fileStatus2.getLen();
            LOG.error(str3);
            HighTideNode.getMetrics().fixFailedFileLengthMismatch.inc();
            throw new IOException(str3);
        }
        List<LocatedBlock> corruptBlocksInFile = corruptBlocksInFile(distributedFileSystem2, path.toUri().getPath(), fileStatus2);
        List<LocatedBlock> locatedBlocks = distributedFileSystem.getClient().namenode.getBlockLocations(path3.toUri().getPath(), 0L, fileStatus.getLen()).getLocatedBlocks();
        for (LocatedBlock locatedBlock : corruptBlocksInFile) {
            LocatedBlock locatedBlock2 = null;
            Iterator<LocatedBlock> it2 = locatedBlocks.iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                LocatedBlock next2 = it2.next();
                if (locatedBlock.getStartOffset() == next2.getStartOffset()) {
                    locatedBlock2 = next2;
                    break;
                }
            }
            if (locatedBlock2 == null || locatedBlock2.getLocations().length == 0) {
                String str4 = "Could not find a good block location for badBlock " + locatedBlock + " in file " + path;
                LOG.error(str4);
                HighTideNode.getMetrics().fixFailedNoGoodBlock.inc();
                throw new IOException(str4);
            }
            WorkItem workItem = new WorkItem(path, locatedBlock2, locatedBlock, distributedFileSystem2, this.conf);
            LOG.info("Queueing up block " + locatedBlock.getBlock().getBlockName() + " to be fixed from block " + locatedBlock2.getBlock().getBlockName());
            this.executor.execute(workItem);
        }
    }

    List<Path> getCorruptFilesFromNamenode(FileSystem fileSystem) throws IOException {
        if (!(fileSystem instanceof DistributedFileSystem)) {
            throw new IOException("Only DistributedFileSystem can be handled  by HighTide.");
        }
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) fileSystem;
        LinkedList linkedList = new LinkedList();
        try {
            LOG.info("Checking filesystem: " + distributedFileSystem.getUri());
            for (String str : DFSUtil.getCorruptFiles(distributedFileSystem)) {
                linkedList.add(new Path(str).makeQualified(fileSystem));
            }
            return linkedList;
        } catch (Exception e) {
            LOG.warn("getCorruptFilesFromNamenode: Unable to contact filesystem: " + fileSystem.getUri() + " ignoring..." + e);
            e.printStackTrace();
            return linkedList;
        }
    }

    List<LocatedBlock> corruptBlocksInFile(DistributedFileSystem distributedFileSystem, String str, FileStatus fileStatus) throws IOException {
        LinkedList linkedList = new LinkedList();
        for (LocatedBlock locatedBlock : distributedFileSystem.getClient().namenode.getBlockLocations(str, 0L, fileStatus.getLen()).getLocatedBlocks()) {
            if (locatedBlock.isCorrupt() || (locatedBlock.getLocations().length == 0 && locatedBlock.getBlockSize() > 0)) {
                LOG.info("Adding bad block for file " + str);
                linkedList.add(locatedBlock);
            }
        }
        return linkedList;
    }

    static ClientDatanodeProtocol createClientDatanodeProtocolProxy(DatanodeInfo datanodeInfo, Configuration configuration) throws IOException {
        InetSocketAddress createSocketAddr = NetUtils.createSocketAddr(datanodeInfo.getHost() + ValueAggregatorDescriptor.TYPE_SEPARATOR + datanodeInfo.getIpcPort());
        if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
            ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + createSocketAddr);
        }
        try {
            return (ClientDatanodeProtocol) RPC.getProxy(ClientDatanodeProtocol.class, 5L, createSocketAddr, configuration);
        } catch (RPC.VersionMismatch e) {
            long clientVersion = e.getClientVersion();
            long serverVersion = e.getServerVersion();
            if (clientVersion <= serverVersion || ProtocolCompatible.isCompatibleClientDatanodeProtocol(clientVersion, serverVersion)) {
                return (ClientDatanodeProtocol) e.getProxy();
            }
            throw new RPC.VersionIncompatible(ClientDatanodeProtocol.class.getName(), clientVersion, serverVersion);
        }
    }
}
