/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.zookeeper;

import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.pulsar.functions.runtime.shaded.com.google.api.client.util.Strings;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieNode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.configuration.Configuration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkBookieRackAffinityMapping
extends AbstractDNSToSwitchMapping
implements ZooKeeperCacheListener<BookiesRackConfiguration>,
RackChangeNotifier {
    private static final Logger LOG = LoggerFactory.getLogger(ZkBookieRackAffinityMapping.class);
    public static final String BOOKIE_INFO_ROOT_PATH = "/bookies";
    public static final String ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE = "zk_data_cache_bk_rack_conf_instance";
    private static final String ZK_LEDGERS_DEFAULT_ROOT_PATH = "/ledgers";
    private ZooKeeperDataCache<BookiesRackConfiguration> bookieMappingCache = null;
    private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null;
    private static final ObjectMapper jsonMapper = ObjectMapperFactory.create();
    private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
    private volatile Map<String, BookieInfo> bookieInfoMap = new HashMap<String, BookieInfo>();

    @Override
    public void setConf(Configuration conf) {
        super.setConf(conf);
        if (conf.getProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE) != null) {
            this.bookieMappingCache = (ZooKeeperDataCache)conf.getProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE);
            this.bookieMappingCache.registerListener(this);
        } else {
            this.bookieMappingCache = this.getAndSetZkCache(conf);
            conf.setProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE, this.bookieMappingCache);
        }
    }

    private void updateRacksWithHost(BookiesRackConfiguration racks) {
        BookiesRackConfiguration newRacksWithHost = new BookiesRackConfiguration();
        HashMap<String, BookieInfo> newBookieInfoMap = new HashMap<String, BookieInfo>();
        racks.forEach((group, bookies) -> bookies.forEach((addr, bi) -> {
            try {
                BookieId bookieId = BookieId.parse(addr);
                BookieAddressResolver addressResolver = this.getBookieAddressResolver();
                if (addressResolver == null) {
                    LOG.warn("Bookie address resolver not yet initialized, skipping resolution");
                } else {
                    BookieSocketAddress bsa = addressResolver.resolve(bookieId);
                    newRacksWithHost.updateBookie((String)group, bsa.toString(), (BookieInfo)bi);
                    String hostname = bsa.getSocketAddress().getHostName();
                    newBookieInfoMap.put(hostname, (BookieInfo)bi);
                    InetAddress address = bsa.getSocketAddress().getAddress();
                    if (null != address) {
                        String hostIp = address.getHostAddress();
                        if (null != hostIp) {
                            newBookieInfoMap.put(hostIp, (BookieInfo)bi);
                        }
                    } else {
                        LOG.info("Network address for {} is unresolvable yet.", addr);
                    }
                }
            }
            catch (BookieAddressResolver.BookieIdNotResolvedException e) {
                LOG.info("Network address for {} is unresolvable yet. error is {}", addr, (Object)e);
            }
        }));
        this.racksWithHost = newRacksWithHost;
        this.bookieInfoMap = newBookieInfoMap;
    }

    private ZooKeeperDataCache<BookiesRackConfiguration> getAndSetZkCache(Configuration conf) {
        ZooKeeperCache zkCache = null;
        if (conf.getProperty("zk_cache_instance") != null) {
            zkCache = (ZooKeeperCache)conf.getProperty("zk_cache_instance");
        } else if (conf instanceof ClientConfiguration) {
            int zkTimeout = ((ClientConfiguration)conf).getZkTimeout();
            try {
                String zkServers;
                String metadataServiceUriStr = ((ClientConfiguration)conf).getMetadataServiceUri();
                URI metadataServiceUri = URI.create(metadataServiceUriStr);
                String ledgersRootPath = metadataServiceUri.getPath();
                if (ZK_LEDGERS_DEFAULT_ROOT_PATH.equals(ledgersRootPath)) {
                    zkServers = ZKMetadataDriverBase.getZKServersFromServiceUri(metadataServiceUri);
                } else {
                    int zkLedgerRootIndex = ledgersRootPath.lastIndexOf("/");
                    zkServers = ZKMetadataDriverBase.getZKServersFromServiceUri(metadataServiceUri) + ledgersRootPath.substring(0, zkLedgerRootIndex);
                }
                ZooKeeperClient zkClient = ZooKeeperClient.newBuilder().connectString(zkServers).sessionTimeoutMs(zkTimeout).build();
                zkCache = new ZooKeeperCache("bookies-racks", zkClient, (int)TimeUnit.MILLISECONDS.toSeconds(zkTimeout)){};
                conf.addProperty("zk_cache_instance", zkCache);
            }
            catch (Exception e) {
                LOG.error("Error creating zookeeper client", (Throwable)e);
            }
        } else {
            LOG.error("No zk configurations available");
        }
        ZooKeeperDataCache<BookiesRackConfiguration> zkDataCache = this.getZkBookieRackMappingCache(zkCache);
        zkDataCache.registerListener(this);
        return zkDataCache;
    }

    private ZooKeeperDataCache<BookiesRackConfiguration> getZkBookieRackMappingCache(ZooKeeperCache zkCache) {
        return new ZooKeeperDataCache<BookiesRackConfiguration>(zkCache){

            @Override
            public BookiesRackConfiguration deserialize(String key, byte[] content) throws Exception {
                LOG.info("Reloading the bookie rack affinity mapping cache.");
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Loading the bookie mappings with bookie info data: {}", (Object)new String(content));
                }
                BookiesRackConfiguration racks = jsonMapper.readValue(content, BookiesRackConfiguration.class);
                return racks;
            }
        };
    }

    @Override
    public List<String> resolve(List<String> bookieAddressList) {
        ArrayList<String> racks = new ArrayList<String>(bookieAddressList.size());
        for (String bookieAddress : bookieAddressList) {
            racks.add(this.getRack(bookieAddress));
        }
        return racks;
    }

    private String getRack(String bookieAddress) {
        try {
            Optional<BookiesRackConfiguration> racks = this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH);
            this.updateRacksWithHost(racks.orElseGet(BookiesRackConfiguration::new));
            if (!racks.isPresent()) {
                return null;
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        BookieInfo bi = this.bookieInfoMap.get(bookieAddress);
        if (bi == null) {
            Optional<BookieInfo> biOpt = this.racksWithHost.getBookie(bookieAddress);
            if (biOpt.isPresent()) {
                bi = biOpt.get();
            } else {
                this.updateRacksWithHost(this.racksWithHost);
                bi = this.bookieInfoMap.get(bookieAddress);
            }
        }
        if (bi != null && !Strings.isNullOrEmpty(bi.getRack()) && !bi.getRack().trim().equals("/")) {
            String rack = bi.getRack();
            if (!rack.startsWith("/")) {
                rack = "/" + rack;
            }
            return rack;
        }
        return null;
    }

    public String toString() {
        return "zk based bookie rack affinity mapping";
    }

    @Override
    public void reloadCachedMappings() {
    }

    @Override
    public void onUpdate(String path, BookiesRackConfiguration data, Stat stat) {
        if (this.rackawarePolicy != null) {
            LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", (Object)data.toString());
            ArrayList<BookieId> bookieAddressList = new ArrayList<BookieId>();
            for (Map bookieMapping : data.values()) {
                for (String addr : bookieMapping.keySet()) {
                    bookieAddressList.add(BookieId.parse(addr));
                }
            }
            this.rackawarePolicy.onBookieRackChange(bookieAddressList);
        }
    }

    @Override
    public void registerRackChangeListener(ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy) {
        this.rackawarePolicy = rackawarePolicy;
    }
}

