package org.apache.hadoop.hdfs.server.hightidenode;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HighTideProtocol;
import org.apache.hadoop.hdfs.protocol.PolicyInfo;
import org.apache.hadoop.hdfs.server.hightidenode.metrics.HighTideNodeMetrics;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.xml.sax.SAXException;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/hightidenode/HighTideNode.class */
public class HighTideNode implements HighTideProtocol {
    public static final Log LOG;
    public static final long SLEEP_TIME = 10000;
    public static final int DEFAULT_PORT = 60100;
    public static final String HIGHTIDE_FULLSYNC_INTERVAL = "hightide.fullsync.interval.seconds";
    public static final long HIGHTIDE_FULLSYNC_INTERVAL_DEFAULT = 3600;
    public static final SimpleDateFormat dateForm;
    private Server server;
    private ConfigManager configMgr;
    private Configuration conf;
    protected boolean initialized;
    protected volatile boolean running;
    static HighTideNodeMetrics myMetrics;
    private InetSocketAddress serverAddress = null;
    private boolean stopRequested = false;
    Daemon triggerThread = null;
    public FileFixer fileFixer = null;
    Daemon fileFixerThread = null;

    /* loaded from: input_file:org/apache/hadoop/hdfs/server/hightidenode/HighTideNode$StartupOption.class */
    public enum StartupOption {
        TEST("-test"),
        REGULAR("-regular");

        private String name;

        StartupOption(String str) {
            this.name = null;
            this.name = str;
        }

        public String getName() {
            return this.name;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/server/hightidenode/HighTideNode$Statistics.class */
    public static class Statistics {
        long numProcessedBlocks;
        long processedSize;

        public void clear() {
            this.numProcessedBlocks = 0L;
            this.processedSize = 0L;
        }

        public String toString() {
            return " numProcessedBlocks = " + this.numProcessedBlocks + " processedSize = " + this.processedSize;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/hightidenode/HighTideNode$TriggerMonitor.class */
    public class TriggerMonitor implements Runnable {
        private Map<String, Long> scanTimes = new HashMap();
        private Map<String, DirectoryTraversal> scanState = new HashMap();

        TriggerMonitor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (HighTideNode.this.running) {
                try {
                    HighTideNode.this.doFullSync();
                } catch (IOException e) {
                    HighTideNode.LOG.info("Exception in doFullSync. " + StringUtils.stringifyException(e));
                }
                try {
                    Thread.sleep(HighTideNode.this.conf.getLong(HighTideNode.HIGHTIDE_FULLSYNC_INTERVAL, HighTideNode.HIGHTIDE_FULLSYNC_INTERVAL_DEFAULT) * 1000);
                } catch (InterruptedException e2) {
                    HighTideNode.LOG.info("InterrupedException in TriggerMonitor.run.");
                }
            }
        }
    }

    HighTideNode(Configuration configuration) throws IOException {
        try {
            initialize(configuration);
        } catch (IOException e) {
            LOG.error(StringUtils.stringifyException(e));
            stop();
            throw e;
        } catch (Exception e2) {
            LOG.error(StringUtils.stringifyException(e2));
            stop();
            throw new IOException(e2);
        }
    }

    @Override // org.apache.hadoop.ipc.VersionedProtocol
    public long getProtocolVersion(String str, long j) throws IOException {
        if (str.equals(HighTideProtocol.class.getName())) {
            return 1L;
        }
        throw new IOException("Unknown protocol to hightide node: " + str);
    }

    @Override // org.apache.hadoop.ipc.VersionedProtocol
    public ProtocolSignature getProtocolSignature(String str, long j, int i) throws IOException {
        return ProtocolSignature.getProtocolSignature(this, str, j, i);
    }

    public void join() {
        try {
            if (this.server != null) {
                this.server.join();
            }
            if (this.triggerThread != null) {
                this.triggerThread.join();
            }
            if (this.fileFixerThread != null) {
                this.fileFixerThread.join();
            }
        } catch (InterruptedException e) {
        }
    }

    public void stop() {
        if (this.stopRequested) {
            return;
        }
        this.stopRequested = true;
        this.running = false;
        if (this.server != null) {
            this.server.stop();
        }
        if (this.triggerThread != null) {
            this.triggerThread.interrupt();
        }
        if (this.fileFixer != null) {
            this.fileFixer.shutdown();
        }
        if (this.fileFixerThread != null) {
            this.fileFixerThread.interrupt();
        }
        if (myMetrics != null) {
            myMetrics.shutdown();
        }
    }

    private static InetSocketAddress getAddress(String str) {
        return NetUtils.createSocketAddr(str);
    }

    public static InetSocketAddress getAddress(Configuration configuration) {
        String str = configuration.get("hightidenode.server.address");
        if (str == null) {
            str = "localhost:60100";
        }
        return getAddress(str);
    }

    public InetSocketAddress getListenerAddress() {
        return this.server.getListenerAddress();
    }

    private void initialize(Configuration configuration) throws IOException, SAXException, InterruptedException, HighTideConfigurationException, ClassNotFoundException, ParserConfigurationException {
        this.conf = configuration;
        InetSocketAddress address = getAddress(configuration);
        int i = configuration.getInt("fs.hightidenodenode.handler.count", 10);
        this.configMgr = new ConfigManager(configuration);
        this.configMgr.reloadConfigsIfNecessary();
        this.configMgr.startReload();
        myMetrics = new HighTideNodeMetrics(configuration, this);
        this.server = RPC.getServer(this, address.getHostName(), address.getPort(), i, false, configuration);
        this.serverAddress = this.server.getListenerAddress();
        LOG.info("HighTideNode up at: " + this.serverAddress);
        this.initialized = true;
        this.running = true;
        this.server.start();
        this.fileFixer = new FileFixer(configuration);
        this.fileFixerThread = new Daemon(this.fileFixer);
        this.fileFixer.setPolicyInfo(this.configMgr.getAllPolicies());
        this.fileFixerThread.start();
        this.triggerThread = new Daemon(new TriggerMonitor());
        this.triggerThread.start();
    }

    void doFullSync() throws IOException {
        Iterator<PolicyInfo> it = this.configMgr.getAllPolicies().iterator();
        while (it.hasNext()) {
            doFullSync(it.next());
        }
    }

    void doFullSync(PolicyInfo policyInfo) throws IOException {
        Path srcPath = policyInfo.getSrcPath();
        LOG.info("Starting fullsync of srcPath " + srcPath);
        FileSystem fileSystem = srcPath.getFileSystem(policyInfo.getConf());
        int parseInt = Integer.parseInt(policyInfo.getProperty("replication"));
        long parseLong = Long.parseLong(policyInfo.getProperty("modTimePeriod"));
        long now = now();
        ArrayList arrayList = new ArrayList();
        arrayList.add(fileSystem.getFileStatus(srcPath));
        DirectoryTraversal directoryTraversal = new DirectoryTraversal(fileSystem, arrayList);
        while (true) {
            FileStatus nextFile = directoryTraversal.getNextFile();
            if (nextFile == null) {
                LOG.info("Completed fullsync of srcPath " + srcPath);
                return;
            }
            if (nextFile.getReplication() != parseInt && nextFile.getModificationTime() + parseLong <= now) {
                srcPath = nextFile.getPath();
                String str = srcPath.toString().split(policyInfo.getSrcPath().toString())[1];
                boolean z = true;
                for (PolicyInfo.PathInfo pathInfo : policyInfo.getDestPaths()) {
                    Path path = new Path(pathInfo.getPath().toString() + str);
                    LOG.debug("Comparing " + srcPath + " with " + path);
                    int parseInt2 = Integer.parseInt(pathInfo.getProperty("replication"));
                    FileSystem fileSystem2 = path.getFileSystem(policyInfo.getConf());
                    FileStatus fileStatus = null;
                    try {
                        fileStatus = fileSystem2.getFileStatus(path);
                    } catch (FileNotFoundException e) {
                        z = false;
                    } catch (IOException e2) {
                        z = false;
                        LOG.info("Unable to locate matching file in destination " + path + StringUtils.stringifyException(e2) + ". Ignoring...");
                    }
                    LOG.info("Matching " + srcPath + " with " + path);
                    getMetrics().filesMatched.inc();
                    if (fileStatus.getModificationTime() != nextFile.getModificationTime() || fileStatus.getBlockSize() != nextFile.getBlockSize() || fileStatus.getLen() != nextFile.getLen()) {
                        z = false;
                        break;
                    } else if (fileStatus.getReplication() > parseInt2) {
                        getMetrics().filesChanged.inc();
                        long len = fileStatus.getLen() * (fileStatus.getReplication() - parseInt2);
                        LOG.info("Changing replication of dest " + path + " from " + ((int) fileStatus.getReplication()) + " to " + parseInt2);
                        fileSystem2.setReplication(fileStatus.getPath(), (short) parseInt2);
                        getMetrics().savedSize.set(len + getMetrics().savedSize.get());
                    }
                }
                if (z && nextFile.getReplication() > parseInt) {
                    LOG.info("Changing replication of source " + srcPath + " from " + ((int) nextFile.getReplication()) + " to " + parseInt);
                    getMetrics().filesChanged.inc();
                    long len2 = nextFile.getLen() * (nextFile.getReplication() - parseInt);
                    fileSystem.setReplication(nextFile.getPath(), (short) parseInt);
                    getMetrics().savedSize.set(len2 + getMetrics().savedSize.get());
                }
            }
        }
    }

    void shutdown() throws IOException, InterruptedException {
        this.configMgr.stopReload();
        this.fileFixer.shutdown();
        this.fileFixerThread.interrupt();
        this.server.stop();
    }

    @Override // org.apache.hadoop.hdfs.protocol.HighTideProtocol
    public PolicyInfo[] getAllPolicies() throws IOException {
        Collection<PolicyInfo> allPolicies = this.configMgr.getAllPolicies();
        return (PolicyInfo[]) allPolicies.toArray(new PolicyInfo[allPolicies.size()]);
    }

    public static HighTideNodeMetrics getMetrics() {
        return myMetrics;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static long now() {
        return System.currentTimeMillis();
    }

    private static void printUsage() {
        System.err.println("Usage: java HighTideNode ");
    }

    private static StartupOption parseArguments(String[] strArr) {
        int length = strArr == null ? 0 : strArr.length;
        StartupOption startupOption = StartupOption.REGULAR;
        for (int i = 0; i < length; i++) {
            String str = strArr[i];
        }
        return startupOption;
    }

    private static void setStartupOption(Configuration configuration, StartupOption startupOption) {
        configuration.set("fs.hightidenodenode.startup", startupOption.toString());
    }

    public static HighTideNode createHighTideNode(String[] strArr, Configuration configuration) throws IOException {
        if (configuration == null) {
            configuration = new Configuration();
        }
        StartupOption parseArguments = parseArguments(strArr);
        if (parseArguments == null) {
            printUsage();
            return null;
        }
        setStartupOption(configuration, parseArguments);
        return new HighTideNode(configuration);
    }

    public static void main(String[] strArr) throws Exception {
        try {
            StringUtils.startupShutdownMessage(HighTideNode.class, strArr, LOG);
            HighTideNode createHighTideNode = createHighTideNode(strArr, null);
            if (createHighTideNode != null) {
                createHighTideNode.join();
            }
        } catch (Throwable th) {
            LOG.error(StringUtils.stringifyException(th));
            System.exit(-1);
        }
    }

    static {
        Configuration.addDefaultResource("hdfs-default.xml");
        Configuration.addDefaultResource("hdfs-site.xml");
        LOG = LogFactory.getLog("org.apache.hadoop.hightidenode.HighTideNode");
        dateForm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    }
}
