/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.bookie.rackawareness;

import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.bookie.rackawareness.ConfigurationStringUtil;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.DefaultBookieAddressResolver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.exceptions.Code;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieNode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.configuration.Configuration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookieRackAffinityMapping
extends AbstractDNSToSwitchMapping
implements RackChangeNotifier {
    private static final Logger LOG = LoggerFactory.getLogger(BookieRackAffinityMapping.class);
    public static final String BOOKIE_INFO_ROOT_PATH = "/bookies";
    public static final String METADATA_STORE_INSTANCE = "METADATA_STORE_INSTANCE";
    private MetadataCache<BookiesRackConfiguration> bookieMappingCache = null;
    private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null;
    private List<BookieId> bookieAddressListLastTime = new ArrayList<BookieId>();
    private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
    private Map<String, BookieInfo> bookieInfoMap = new HashMap<String, BookieInfo>();

    public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
        MetadataStore store;
        Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
        if (storeProperty != null) {
            if (!(storeProperty instanceof MetadataStore)) {
                throw new RuntimeException("METADATA_STORE_INSTANCE is not an instance of MetadataStore");
            }
            store = (MetadataStore)storeProperty;
        } else {
            String url;
            String metadataServiceUri = ConfigurationStringUtil.castToString(conf.getProperty("metadataServiceUri"));
            if (StringUtils.isNotBlank(metadataServiceUri)) {
                try {
                    url = metadataServiceUri.replaceFirst("metadata-store:", "").replace(";", ",");
                }
                catch (Exception e) {
                    throw new MetadataException(Code.METADATA_SERVICE_ERROR, (Throwable)e);
                }
            } else {
                String zkServers = ConfigurationStringUtil.castToString(conf.getProperty("zkServers"));
                if (StringUtils.isBlank(zkServers)) {
                    String errorMsg = String.format("Neither %s configuration set in the BK client configuration nor metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE);
                    throw new RuntimeException(errorMsg);
                }
                url = zkServers;
            }
            try {
                int zkTimeout = Integer.parseInt((String)conf.getProperty("zkTimeout"));
                store = MetadataStoreExtended.create(url, MetadataStoreConfig.builder().sessionTimeoutMillis(zkTimeout).build());
            }
            catch (MetadataStoreException e) {
                throw new MetadataException(Code.METADATA_SERVICE_ERROR, (Throwable)e);
            }
        }
        return store;
    }

    @Override
    public synchronized void setConf(Configuration conf) {
        super.setConf(conf);
        try {
            MetadataStore store = BookieRackAffinityMapping.createMetadataStore(conf);
            this.bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
            store.registerListener(this::handleUpdates);
            this.racksWithHost = this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get().orElseGet(BookiesRackConfiguration::new);
            this.updateRacksWithHost(this.racksWithHost);
            this.watchAvailableBookies();
            for (Map bookieMapping : this.racksWithHost.values()) {
                for (String address : bookieMapping.keySet()) {
                    this.bookieAddressListLastTime.add(BookieId.parse(address));
                }
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("BookieRackAffinityMapping init, bookieAddressListLastTime {}", this.bookieAddressListLastTime);
            }
        }
        catch (InterruptedException | ExecutionException | MetadataException e) {
            throw new RuntimeException("METADATA_STORE_INSTANCE failed to init BookieId list");
        }
    }

    private void watchAvailableBookies() {
        BookieAddressResolver bookieAddressResolver = this.getBookieAddressResolver();
        if (bookieAddressResolver instanceof DefaultBookieAddressResolver) {
            try {
                Field field = DefaultBookieAddressResolver.class.getDeclaredField("registrationClient");
                field.setAccessible(true);
                RegistrationClient registrationClient = (RegistrationClient)field.get(bookieAddressResolver);
                registrationClient.watchWritableBookies(versioned -> {
                    try {
                        this.racksWithHost = this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get().orElseGet(BookiesRackConfiguration::new);
                        this.updateRacksWithHost(this.racksWithHost);
                    }
                    catch (InterruptedException | ExecutionException e) {
                        LOG.error("Failed to update rack info. ", (Throwable)e);
                    }
                });
            }
            catch (IllegalAccessException | NoSuchFieldException e) {
                LOG.error("Failed watch available bookies.", (Throwable)e);
            }
        }
    }

    private synchronized void updateRacksWithHost(BookiesRackConfiguration racks) {
        BookiesRackConfiguration newRacksWithHost = new BookiesRackConfiguration();
        HashMap<String, BookieInfo> newBookieInfoMap = new HashMap<String, BookieInfo>();
        racks.forEach((group, bookies) -> bookies.forEach((addr, bi) -> {
            try {
                BookieId bookieId = BookieId.parse(addr);
                BookieAddressResolver addressResolver = this.getBookieAddressResolver();
                if (addressResolver == null) {
                    LOG.warn("Bookie address resolver not yet initialized, skipping resolution");
                } else {
                    BookieSocketAddress bsa = addressResolver.resolve(bookieId);
                    newRacksWithHost.updateBookie((String)group, bsa.toString(), (BookieInfo)bi);
                    String hostname = bsa.getSocketAddress().getHostName();
                    newBookieInfoMap.put(hostname, (BookieInfo)bi);
                    InetAddress address = bsa.getSocketAddress().getAddress();
                    if (null != address) {
                        String hostIp = address.getHostAddress();
                        if (null != hostIp) {
                            newBookieInfoMap.put(hostIp, (BookieInfo)bi);
                        }
                    } else {
                        LOG.info("Network address for {} is unresolvable yet.", addr);
                    }
                }
            }
            catch (BookieAddressResolver.BookieIdNotResolvedException e) {
                LOG.info("Network address for {} is unresolvable yet. error is {}", addr, (Object)e);
            }
        }));
        this.racksWithHost = newRacksWithHost;
        this.bookieInfoMap = newBookieInfoMap;
    }

    @Override
    public synchronized List<String> resolve(List<String> bookieAddressList) {
        ArrayList<String> racks = new ArrayList<String>(bookieAddressList.size());
        for (String bookieAddress : bookieAddressList) {
            racks.add(this.getRack(bookieAddress));
        }
        return racks;
    }

    private String getRack(String bookieAddress) {
        BookieInfo bi = this.bookieInfoMap.get(bookieAddress);
        if (bi == null) {
            bi = this.racksWithHost.getBookie(bookieAddress).orElse(null);
        }
        if (bi != null && !StringUtils.isEmpty(bi.getRack()) && !bi.getRack().trim().equals("/")) {
            Object rack = bi.getRack();
            if (!((String)rack).startsWith("/")) {
                rack = "/" + (String)rack;
            }
            return rack;
        }
        return null;
    }

    public String toString() {
        return "zk based bookie rack affinity mapping";
    }

    @Override
    public void reloadCachedMappings() {
    }

    private void handleUpdates(Notification n) {
        if (!n.getPath().equals(BOOKIE_INFO_ROOT_PATH)) {
            return;
        }
        this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).thenAccept(optVal -> {
            BookieRackAffinityMapping bookieRackAffinityMapping = this;
            synchronized (bookieRackAffinityMapping) {
                LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", optVal);
                this.updateRacksWithHost(optVal.orElseGet(BookiesRackConfiguration::new));
                ArrayList<BookieId> bookieAddressList = new ArrayList<BookieId>();
                for (Map bookieMapping : (Collection)optVal.map(Map::values).orElse(Collections.emptyList())) {
                    for (String addr : bookieMapping.keySet()) {
                        bookieAddressList.add(BookieId.parse(addr));
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Bookies with rack update from {} to {}", this.bookieAddressListLastTime, bookieAddressList);
                }
                HashSet<BookieId> bookieIdSet = new HashSet<BookieId>(bookieAddressList);
                bookieIdSet.addAll(this.bookieAddressListLastTime);
                this.bookieAddressListLastTime = bookieAddressList;
                if (this.rackawarePolicy != null) {
                    this.rackawarePolicy.onBookieRackChange(new ArrayList<BookieId>(bookieIdSet));
                }
            }
        });
    }

    @Override
    public void registerRackChangeListener(ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy) {
        this.rackawarePolicy = rackawarePolicy;
    }
}

