package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadProxyProvider;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.class */
public class TestObserverWithRouter {
    private static final int NUM_NAMESERVICES = 2;
    private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup";
    private MiniRouterDFSCluster cluster;
    private MiniRouterDFSCluster.RouterContext routerContext;
    private FileSystem fileSystem;
    private static final String ROUTER_NS_ID = "router-service";
    private static final String AUTO_MSYNC_PERIOD_KEY_PREFIX = "dfs.client.failover.observer.auto-msync-period";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hadoop.hdfs.server.federation.router.TestObserverWithRouter$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hadoop$hdfs$server$federation$router$TestObserverWithRouter$ConfigSetting = new int[ConfigSetting.values().length];

        static {
            try {
                $SwitchMap$org$apache$hadoop$hdfs$server$federation$router$TestObserverWithRouter$ConfigSetting[ConfigSetting.USE_NAMENODE_PROXY_FLAG.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hadoop$hdfs$server$federation$router$TestObserverWithRouter$ConfigSetting[ConfigSetting.USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER.ordinal()] = TestObserverWithRouter.NUM_NAMESERVICES;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hadoop$hdfs$server$federation$router$TestObserverWithRouter$ConfigSetting[ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter$ConfigSetting.class */
    public enum ConfigSetting {
        USE_NAMENODE_PROXY_FLAG,
        USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER,
        USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER
    }

    @BeforeEach
    void init(TestInfo testInfo) throws Exception {
        if (testInfo.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) {
            return;
        }
        startUpCluster(NUM_NAMESERVICES, null);
    }

    @AfterEach
    public void teardown() throws IOException {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
        this.routerContext = null;
        if (this.fileSystem != null) {
            this.fileSystem.close();
            this.fileSystem = null;
        }
    }

    public void startUpCluster(int i, Configuration configuration) throws Exception {
        int i2 = NUM_NAMESERVICES + i;
        Configuration configuration2 = new Configuration(false);
        configuration2.setBoolean("dfs.federation.router.observer.read.default", true);
        configuration2.setBoolean("dfs.ha.tail-edits.in-progress", true);
        configuration2.set("dfs.ha.tail-edits.period", "0ms");
        configuration2.setBoolean("dfs.namenode.state.context.enabled", true);
        if (configuration != null) {
            configuration.iterator().forEachRemaining(entry -> {
                configuration2.set((String) entry.getKey(), (String) entry.getValue());
            });
        }
        this.cluster = new MiniRouterDFSCluster(true, NUM_NAMESERVICES, i2);
        this.cluster.addNamenodeOverrides(configuration2);
        this.cluster.startCluster();
        if (this.cluster.isHighAvailability()) {
            for (String str : this.cluster.getNameservices()) {
                this.cluster.switchToActive(str, FederationTestUtils.NAMENODES[0]);
                this.cluster.switchToStandby(str, FederationTestUtils.NAMENODES[1]);
                for (int i3 = NUM_NAMESERVICES; i3 < i2; i3++) {
                    this.cluster.switchToObserver(str, FederationTestUtils.NAMENODES[i3]);
                }
            }
        }
        Configuration build = new RouterConfigBuilder().metrics().rpc().build();
        this.cluster.addRouterOverrides(configuration2);
        this.cluster.addRouterOverrides(build);
        this.cluster.startRouters();
        this.cluster.registerNamenodes();
        this.cluster.waitNamenodeRegistration();
        this.cluster.installMockLocations();
        this.cluster.waitActiveNamespaces();
        this.routerContext = this.cluster.getRandomRouter();
    }

    private Configuration getConfToEnableObserverReads(ConfigSetting configSetting) {
        Configuration configuration = new Configuration();
        switch (AnonymousClass1.$SwitchMap$org$apache$hadoop$hdfs$server$federation$router$TestObserverWithRouter$ConfigSetting[configSetting.ordinal()]) {
            case 1:
                configuration.setBoolean("dfs.client.rbf.observer.read.enable", true);
                break;
            case NUM_NAMESERVICES /* 2 */:
                configuration.set("dfs.client.failover.proxy.provider." + this.routerContext.getRouter().getRpcServerAddress().getHostName(), RouterObserverReadProxyProvider.class.getName());
                break;
            case 3:
                configuration.set("dfs.nameservices", ROUTER_NS_ID);
                configuration.set("dfs.ha.namenodes.router-service", "router1");
                configuration.set("dfs.namenode.rpc-address.router-service.router1", this.routerContext.getFileSystemURI().toString());
                DistributedFileSystem.setDefaultUri(configuration, "hdfs://router-service");
                configuration.set("dfs.client.failover.proxy.provider.router-service", RouterObserverReadConfiguredFailoverProxyProvider.class.getName());
                break;
            default:
                Assertions.fail("Unknown config setting: " + configSetting);
                break;
        }
        return configuration;
    }

    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testObserverRead(ConfigSetting configSetting) throws Exception {
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        internalTestObserverRead();
    }

    @Test
    public void testReadWithoutObserverClientConfigurations() throws Exception {
        this.fileSystem = this.routerContext.getFileSystem();
        Assert.assertThrows(AssertionError.class, this::internalTestObserverRead);
    }

    public void internalTestObserverRead() throws Exception {
        Assert.assertEquals("First namenode should be observer", ((FederationNamenodeContext) this.routerContext.getRouter().getNamenodeResolver().getNamenodesForNameserviceId(this.cluster.getNameservices().get(0), true).get(0)).getState(), FederationNamenodeServiceState.OBSERVER);
        Path path = new Path("/testFile");
        this.fileSystem.create(path).close();
        this.fileSystem.open(path).close();
        Assert.assertEquals("Two calls should be sent to active", 2L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
        Assert.assertEquals("One call should be sent to observer", 1L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps());
    }

    @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testObserverReadWithoutFederatedStatePropagation(ConfigSetting configSetting) throws Exception {
        Configuration configuration = new Configuration(false);
        configuration.setInt("dfs.federation.router.observer.federated.state.propagation.maxsize", 0);
        startUpCluster(NUM_NAMESERVICES, configuration);
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        Assert.assertEquals("First namenode should be observer", ((FederationNamenodeContext) this.routerContext.getRouter().getNamenodeResolver().getNamenodesForNameserviceId(this.cluster.getNameservices().get(0), true).get(0)).getState(), FederationNamenodeServiceState.OBSERVER);
        Path path = new Path("/testFile");
        this.fileSystem.create(path).close();
        this.fileSystem.open(path).close();
        Assert.assertEquals("Three calls should be sent to active", 3L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
        Assert.assertEquals("No call should be sent to observer", 0L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps());
    }

    @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testDisablingObserverReadUsingNameserviceOverride(ConfigSetting configSetting) throws Exception {
        Configuration configuration = new Configuration(false);
        configuration.set("dfs.federation.router.observer.read.overrides", "ns0");
        startUpCluster(NUM_NAMESERVICES, configuration);
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        Path path = new Path("/testFile");
        this.fileSystem.create(path).close();
        this.fileSystem.open(path).close();
        Assert.assertEquals("Three calls should be sent to active", 3L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
        Assert.assertEquals("Zero calls should be sent to observer", 0L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps());
    }

    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testReadWhenObserverIsDown(ConfigSetting configSetting) throws Exception {
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        Path path = new Path("/testFile1");
        this.fileSystem.create(path).close();
        Assert.assertNotEquals("No observer found", 3L, stopObserver(1));
        Assert.assertNotEquals("No observer found", 4L, stopObserver(1));
        this.fileSystem.open(path).close();
        Assert.assertEquals("Three calls should be sent to active", 3L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
        Assert.assertEquals("No call should send to observer", 0L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps());
    }

    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testMultipleObserver(ConfigSetting configSetting) throws Exception {
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        Path path = new Path("/testFile1");
        this.fileSystem.create(path).close();
        stopObserver(1);
        this.fileSystem.open(path).close();
        Assert.assertEquals("Two calls should be sent to active", 2L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
        Assert.assertEquals("Read should be success with another observer", 1L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps());
        stopObserver(1);
        this.fileSystem.open(path).close();
        Assert.assertEquals("One call should be sent to active", 2 + 1, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
        Assert.assertEquals("No call should send to observer", 1 + 0, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps());
    }

    private int stopObserver(int i) {
        int i2 = 0;
        while (i2 < this.cluster.getNamenodes().size()) {
            NameNode nameNode = this.cluster.getCluster().getNameNode(i2);
            if (nameNode != null && nameNode.isObserverState()) {
                this.cluster.getCluster().shutdownNameNode(i2);
                i--;
                if (i == 0) {
                    break;
                }
            }
            i2++;
        }
        return i2;
    }

    @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
    @Test
    public void testMultipleObserverRouter() throws Exception {
        StateStoreDFSCluster stateStoreDFSCluster = new StateStoreDFSCluster(true, 4, 4, TimeUnit.SECONDS.toMillis(5L), TimeUnit.SECONDS.toMillis(5L));
        Configuration build = new RouterConfigBuilder().stateStore().admin().rpc().enableLocalHeartbeat(true).heartbeat().build();
        StringBuilder sb = new StringBuilder();
        String str = stateStoreDFSCluster.getNameservices().get(0);
        MiniRouterDFSCluster.NamenodeContext namenodeContext = stateStoreDFSCluster.getNamenodes(str).get(1);
        build.set("dfs.nameservice.id", str);
        build.set("dfs.ha.namenode.id", namenodeContext.getNamenodeId());
        String str2 = stateStoreDFSCluster.getNameservices().get(1);
        Iterator<MiniRouterDFSCluster.NamenodeContext> it = stateStoreDFSCluster.getNamenodes(str2).iterator();
        while (it.hasNext()) {
            String confSuffix = it.next().getConfSuffix();
            if (sb.length() != 0) {
                sb.append(",");
            }
            sb.append(confSuffix);
        }
        build.set("dfs.federation.router.monitor.namenode", sb.toString());
        build.setBoolean("dfs.federation.router.observer.read.default", true);
        build.setBoolean("dfs.ha.tail-edits.in-progress", true);
        build.set("dfs.ha.tail-edits.period", "0ms");
        stateStoreDFSCluster.addNamenodeOverrides(build);
        stateStoreDFSCluster.addRouterOverrides(build);
        stateStoreDFSCluster.startCluster();
        if (stateStoreDFSCluster.isHighAvailability()) {
            for (String str3 : stateStoreDFSCluster.getNameservices()) {
                stateStoreDFSCluster.switchToActive(str3, FederationTestUtils.NAMENODES[0]);
                stateStoreDFSCluster.switchToStandby(str3, FederationTestUtils.NAMENODES[1]);
                for (int i = NUM_NAMESERVICES; i < 4; i++) {
                    stateStoreDFSCluster.switchToObserver(str3, FederationTestUtils.NAMENODES[i]);
                }
            }
        }
        stateStoreDFSCluster.startRouters();
        stateStoreDFSCluster.waitClusterUp();
        this.routerContext = stateStoreDFSCluster.getRandomRouter();
        MembershipNamenodeResolver namenodeResolver = this.routerContext.getRouter().getNamenodeResolver();
        namenodeResolver.loadCache(true);
        List namenodesForNameserviceId = namenodeResolver.getNamenodesForNameserviceId(str, true);
        List namenodesForNameserviceId2 = namenodeResolver.getNamenodesForNameserviceId(str2, true);
        Assert.assertEquals(((FederationNamenodeContext) namenodesForNameserviceId.get(0)).getState(), FederationNamenodeServiceState.OBSERVER);
        Assert.assertEquals(((FederationNamenodeContext) namenodesForNameserviceId.get(1)).getState(), FederationNamenodeServiceState.OBSERVER);
        Assert.assertNotEquals(((FederationNamenodeContext) namenodesForNameserviceId.get(0)).getNamenodeId(), ((FederationNamenodeContext) namenodesForNameserviceId.get(1)).getNamenodeId());
        Assert.assertEquals(((FederationNamenodeContext) namenodesForNameserviceId2.get(0)).getState(), FederationNamenodeServiceState.OBSERVER);
        stateStoreDFSCluster.shutdown();
    }

    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testUnavailableObserverNN(ConfigSetting configSetting) throws Exception {
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        stopObserver(NUM_NAMESERVICES);
        Path path = new Path("/testFile");
        this.fileSystem.create(path).close();
        this.fileSystem.open(path).close();
        Assert.assertEquals("Three calls should be send to active", 3L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
        boolean z = false;
        Iterator<String> it = this.cluster.getNameservices().iterator();
        while (it.hasNext()) {
            Iterator it2 = this.routerContext.getRouter().getNamenodeResolver().getNamenodesForNameserviceId(it.next(), false).iterator();
            while (it2.hasNext()) {
                if (FederationNamenodeServiceState.UNAVAILABLE == ((FederationNamenodeContext) it2.next()).getState()) {
                    z = true;
                }
            }
        }
        Assert.assertTrue("There must be unavailable namenodes", z);
    }

    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testRouterMsync(ConfigSetting configSetting) throws Exception {
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        this.fileSystem.create(new Path("/testFile")).close();
        Assert.assertEquals("Two calls should be sent to active", 2L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
        this.fileSystem.msync();
        Assert.assertEquals("Four calls should be sent to active", 4L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
    }

    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testSingleRead(ConfigSetting configSetting) throws Exception {
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        Assert.assertEquals("First namenode should be observer", ((FederationNamenodeContext) this.routerContext.getRouter().getNamenodeResolver().getNamenodesForNameserviceId(this.cluster.getNameservices().get(0), true).get(0)).getState(), FederationNamenodeServiceState.OBSERVER);
        this.fileSystem.listFiles(new Path("/"), false);
        this.fileSystem.close();
        Assert.assertEquals("Only one call should be sent to active", 1L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
        Assert.assertEquals("No calls should be sent to observer", 0L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps());
    }

    @Test
    public void testSingleReadUsingObserverReadProxyProvider() throws Exception {
        this.fileSystem = this.routerContext.getFileSystemWithObserverReadProxyProvider();
        Assert.assertEquals("First namenode should be observer", ((FederationNamenodeContext) this.routerContext.getRouter().getNamenodeResolver().getNamenodesForNameserviceId(this.cluster.getNameservices().get(0), true).get(0)).getState(), FederationNamenodeServiceState.OBSERVER);
        this.fileSystem.listFiles(new Path("/"), false);
        this.fileSystem.close();
        Assert.assertEquals("Two calls should be sent to active", 2L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
        Assert.assertEquals("One call should be sent to observer", 1L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps());
    }

    @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
    @Test
    public void testClientReceiveResponseState() {
        ClientGSIContext clientGSIContext = new ClientGSIContext();
        HashMap hashMap = new HashMap();
        hashMap.put("ns0", 10L);
        HdfsProtos.RouterFederatedStateProto.Builder newBuilder = HdfsProtos.RouterFederatedStateProto.newBuilder();
        newBuilder.getClass();
        hashMap.forEach((v1, v2) -> {
            r1.putNamespaceStateIds(v1, v2);
        });
        clientGSIContext.receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto.newBuilder().setCallId(1).setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS).setRouterFederatedState(newBuilder.build().toByteString()).build());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("ns0", 8L);
        HdfsProtos.RouterFederatedStateProto.Builder newBuilder2 = HdfsProtos.RouterFederatedStateProto.newBuilder();
        newBuilder2.getClass();
        hashMap2.forEach((v1, v2) -> {
            r1.putNamespaceStateIds(v1, v2);
        });
        clientGSIContext.receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto.newBuilder().setRouterFederatedState(newBuilder2.build().toByteString()).setCallId(NUM_NAMESERVICES).setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS).build());
        Map routerFederatedStateMap = ClientGSIContext.getRouterFederatedStateMap(clientGSIContext.getRouterFederatedState());
        Assertions.assertEquals(1, routerFederatedStateMap.size());
        Assertions.assertEquals(10L, (Long) routerFederatedStateMap.get("ns0"));
    }

    @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
    @Test
    public void testRouterResponseHeaderState() {
        RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration());
        ConcurrentHashMap namespaceIdMap = routerStateIdContext.getNamespaceIdMap();
        namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10L));
        namespaceIdMap.put("ns1", new LongAccumulator(Math::max, 100L));
        namespaceIdMap.put("ns2", new LongAccumulator(Math::max, Long.MIN_VALUE));
        HashMap hashMap = new HashMap();
        hashMap.put("ns0", 10L);
        hashMap.put("ns2", 100L);
        hashMap.put("ns3", Long.MIN_VALUE);
        HdfsProtos.RouterFederatedStateProto.Builder newBuilder = HdfsProtos.RouterFederatedStateProto.newBuilder();
        newBuilder.getClass();
        hashMap.forEach((v1, v2) -> {
            r1.putNamespaceStateIds(v1, v2);
        });
        RpcHeaderProtos.RpcResponseHeaderProto.Builder routerFederatedState = RpcHeaderProtos.RpcResponseHeaderProto.newBuilder().setCallId(1).setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS).setRouterFederatedState(newBuilder.build().toByteString());
        routerStateIdContext.updateResponseState(routerFederatedState);
        Map routerFederatedStateMap = RouterStateIdContext.getRouterFederatedStateMap(routerFederatedState.build().getRouterFederatedState());
        Assertions.assertEquals(NUM_NAMESERVICES, routerFederatedStateMap.size());
        Assertions.assertEquals(10L, (Long) routerFederatedStateMap.get("ns0"));
        Assertions.assertEquals(100L, (Long) routerFederatedStateMap.get("ns1"));
    }

    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception {
        Path path = new Path("/");
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        RouterStateIdContext routerStateIdContext = this.routerContext.getRouterRpcServer().getRouterStateIdContext();
        for (int i = 0; i < 10; i++) {
            this.fileSystem.create(new Path(path, "file" + i)).close();
        }
        Assert.assertEquals("Router's shared should have progressed.", 21L, routerStateIdContext.getNamespaceStateId("ns0").get());
    }

    @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testSharedStateInRouterStateIdContext(ConfigSetting configSetting) throws Exception {
        Path path = new Path("/");
        Configuration configuration = new Configuration(false);
        configuration.setLong("dfs.federation.router.connection.pool.clean.ms", 1000L);
        configuration.setLong("dfs.federation.router.connection.clean.ms", 1000 / 10);
        startUpCluster(1, configuration);
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        RouterStateIdContext routerStateIdContext = this.routerContext.getRouterRpcServer().getRouterStateIdContext();
        this.fileSystem.listStatus(path);
        this.fileSystem.listStatus(path);
        LongAccumulator namespaceStateId = routerStateIdContext.getNamespaceStateId("ns0");
        Thread.sleep(1000 * 2);
        this.fileSystem.listStatus(path);
        this.fileSystem.close();
        LongAccumulator namespaceStateId2 = routerStateIdContext.getNamespaceStateId("ns0");
        long activeProxyOps = this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps();
        long observerProxyOps = this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps();
        Assert.assertEquals("One call should be sent to active", 1L, activeProxyOps);
        Assert.assertEquals("Two calls should be sent to observer", 2L, observerProxyOps);
        Assertions.assertSame(namespaceStateId, namespaceStateId2, "The same object should be used in the shared RouterStateIdContext");
    }

    @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testRouterStateIdContextCleanup(ConfigSetting configSetting) throws Exception {
        Path path = new Path("/");
        long millis = TimeUnit.SECONDS.toMillis(1L);
        Configuration configuration = new Configuration(false);
        configuration.setLong("dfs.federation.router.store.membership.expiration", millis);
        startUpCluster(1, configuration);
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        RouterStateIdContext routerStateIdContext = this.routerContext.getRouterRpcServer().getRouterStateIdContext();
        this.fileSystem.listStatus(path);
        List namespaces = routerStateIdContext.getNamespaces();
        this.fileSystem.close();
        MockResolver mockResolver = (MockResolver) this.routerContext.getRouter().getNamenodeResolver();
        mockResolver.cleanRegistrations();
        mockResolver.setDisableRegistration(true);
        Thread.sleep(millis * 2);
        List namespaces2 = routerStateIdContext.getNamespaces();
        Assert.assertEquals(1L, namespaces.size());
        Assert.assertEquals("ns0", namespaces.get(0));
        Assert.assertTrue(namespaces2.isEmpty());
    }

    @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testPeriodicStateRefreshUsingActiveNamenode(ConfigSetting configSetting) throws Exception {
        Path path = new Path("/");
        Configuration configuration = new Configuration(false);
        configuration.set("dfs.federation.router.observer.state.id.refresh.period", "500ms");
        configuration.set("dfs.ha.tail-edits.period", "3s");
        startUpCluster(1, configuration);
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        this.fileSystem.listStatus(path);
        int length = this.fileSystem.listStatus(path).length;
        DFSClient client = this.cluster.getNamenodes("ns0").stream().filter(namenodeContext -> {
            return namenodeContext.getNamenode().isActiveState();
        }).findFirst().orElseThrow(() -> {
            return new IllegalStateException("No active namenode.");
        }).getClient();
        for (int i = 0; i < 10; i++) {
            client.mkdirs("/dir" + i, (FsPermission) null, false);
        }
        client.close();
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(!this.routerContext.getRouterRpcClient().isNamespaceStateIdFresh("ns0"));
        }, 100L, 10000L, "Timeout: Namespace state was never considered stale.");
        Assert.assertEquals("List-status should show newly created directories.", length + 10, this.fileSystem.listStatus(path).length);
    }

    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exception {
        Configuration confToEnableObserverReads = getConfToEnableObserverReads(configSetting);
        confToEnableObserverReads.setLong("dfs.client.failover.observer.auto-msync-period." + (configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? ROUTER_NS_ID : this.routerContext.getRouter().getRpcServerAddress().getHostName()), 0L);
        this.fileSystem = this.routerContext.getFileSystem(confToEnableObserverReads);
        Assert.assertEquals("First namenode should be observer", ((FederationNamenodeContext) this.routerContext.getRouter().getNamenodeResolver().getNamenodesForNameserviceId(this.cluster.getNameservices().get(0), true).get(0)).getState(), FederationNamenodeServiceState.OBSERVER);
        Path path = new Path("/");
        for (int i = 0; i < 15; i++) {
            this.fileSystem.listFiles(path, false);
        }
        this.fileSystem.close();
        long activeProxyOps = this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps();
        long observerProxyOps = this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps();
        switch (AnonymousClass1.$SwitchMap$org$apache$hadoop$hdfs$server$federation$router$TestObserverWithRouter$ConfigSetting[configSetting.ordinal()]) {
            case 1:
                Assert.assertEquals("Calls sent to the active", 1L, activeProxyOps);
                Assert.assertEquals("Reads sent to observer", 15 - 1, observerProxyOps);
                return;
            case NUM_NAMESERVICES /* 2 */:
            case 3:
                Assert.assertEquals("Msyncs sent to the active namenodes", NUM_NAMESERVICES * 15, activeProxyOps);
                Assert.assertEquals("Reads sent to observer", 15, observerProxyOps);
                return;
            default:
                Assertions.fail("Unknown config setting: " + configSetting);
                return;
        }
    }

    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testAutoMsyncNonZero(ConfigSetting configSetting) throws Exception {
        Configuration confToEnableObserverReads = getConfToEnableObserverReads(configSetting);
        confToEnableObserverReads.setLong("dfs.client.failover.observer.auto-msync-period." + (configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? ROUTER_NS_ID : this.routerContext.getRouter().getRpcServerAddress().getHostName()), 3000L);
        this.fileSystem = this.routerContext.getFileSystem(confToEnableObserverReads);
        Assert.assertEquals("First namenode should be observer", ((FederationNamenodeContext) this.routerContext.getRouter().getNamenodeResolver().getNamenodesForNameserviceId(this.cluster.getNameservices().get(0), true).get(0)).getState(), FederationNamenodeServiceState.OBSERVER);
        Path path = new Path("/");
        this.fileSystem.listFiles(path, false);
        this.fileSystem.listFiles(path, false);
        Thread.sleep(5000L);
        this.fileSystem.listFiles(path, false);
        this.fileSystem.close();
        long activeProxyOps = this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps();
        long observerProxyOps = this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps();
        switch (AnonymousClass1.$SwitchMap$org$apache$hadoop$hdfs$server$federation$router$TestObserverWithRouter$ConfigSetting[configSetting.ordinal()]) {
            case 1:
                Assert.assertEquals("Calls sent to the active", 1L, activeProxyOps);
                Assert.assertEquals("Reads sent to observer", 2L, observerProxyOps);
                return;
            case NUM_NAMESERVICES /* 2 */:
            case 3:
                Assert.assertEquals("Msyncs sent to the active namenodes", 4L, activeProxyOps);
                Assert.assertEquals("Reads sent to observer", 3L, observerProxyOps);
                return;
            default:
                Assertions.fail("Unknown config setting: " + configSetting);
                return;
        }
    }

    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testThatWriteDoesntBypassNeedForMsync(ConfigSetting configSetting) throws Exception {
        Configuration confToEnableObserverReads = getConfToEnableObserverReads(configSetting);
        confToEnableObserverReads.setLong("dfs.client.failover.observer.auto-msync-period." + (configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? ROUTER_NS_ID : this.routerContext.getRouter().getRpcServerAddress().getHostName()), 3000L);
        this.fileSystem = this.routerContext.getFileSystem(confToEnableObserverReads);
        Assert.assertEquals("First namenode should be observer", ((FederationNamenodeContext) this.routerContext.getRouter().getNamenodeResolver().getNamenodesForNameserviceId(this.cluster.getNameservices().get(0), true).get(0)).getState(), FederationNamenodeServiceState.OBSERVER);
        Path path = new Path("/");
        this.fileSystem.listFiles(path, false);
        Thread.sleep(5000L);
        this.fileSystem.mkdirs(new Path(path, "mkdirLocation"));
        this.fileSystem.listFiles(path, false);
        this.fileSystem.close();
        long activeProxyOps = this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps();
        long observerProxyOps = this.routerContext.getRouter().getRpcServer().getRPCMetrics().getObserverProxyOps();
        switch (AnonymousClass1.$SwitchMap$org$apache$hadoop$hdfs$server$federation$router$TestObserverWithRouter$ConfigSetting[configSetting.ordinal()]) {
            case 1:
                Assert.assertEquals("Calls sent to the active namenodes", 2L, activeProxyOps);
                Assert.assertEquals("Read sent to observer", 1L, observerProxyOps);
                return;
            case NUM_NAMESERVICES /* 2 */:
            case 3:
                Assert.assertEquals("Calls sent to the active namenodes", 5L, activeProxyOps);
                Assert.assertEquals("Reads sent to observer", 2L, observerProxyOps);
                return;
            default:
                Assertions.fail("Unknown config setting: " + configSetting);
                return;
        }
    }

    @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testMsyncOnlyToNamespaceWithObserver(ConfigSetting configSetting) throws Exception {
        Configuration configuration = new Configuration(false);
        configuration.set("dfs.federation.router.observer.read.overrides", "ns0");
        startUpCluster(1, configuration);
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        this.fileSystem.msync();
        Assert.assertEquals("Only one call to the namespace with an observer", 1L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
    }

    @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
    @EnumSource(ConfigSetting.class)
    @ParameterizedTest
    public void testMsyncWithNoNamespacesEligibleForCRS(ConfigSetting configSetting) throws Exception {
        Configuration configuration = new Configuration(false);
        configuration.setBoolean("dfs.federation.router.observer.read.default", false);
        startUpCluster(1, configuration);
        this.fileSystem = this.routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
        this.fileSystem.msync();
        Assert.assertEquals("No calls to any namespace", 0L, this.routerContext.getRouter().getRpcServer().getRPCMetrics().getActiveProxyOps());
    }
}
