package org.apache.distributedlog.client.routing;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.twitter.finagle.NoBrokersAvailableException;
import com.twitter.finagle.stats.StatsReceiver;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.distributedlog.client.routing.RoutingService;
import org.apache.distributedlog.client.routing.ServerSetWatcher;
import org.apache.distributedlog.service.DLSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/client/routing/ServerSetRoutingService.class */
class ServerSetRoutingService extends Thread implements RoutingService {
    private static final Logger logger = LoggerFactory.getLogger(ServerSetRoutingService.class);
    private final ServerSetWatcher serverSetWatcher;
    private final Set<SocketAddress> hostSet;
    private List<SocketAddress> hostList;
    private final HashFunction hasher;
    private final AtomicReference<ImmutableSet<DLSocketAddress>> serverSetChange;
    private final CountDownLatch changeLatch;
    protected final CopyOnWriteArraySet<RoutingService.RoutingListener> listeners;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/distributedlog/client/routing/ServerSetRoutingService$HostComparator.class */
    public static class HostComparator implements Comparator<SocketAddress> {
        private static final HostComparator INSTANCE = new HostComparator();

        private HostComparator() {
        }

        @Override // java.util.Comparator
        public int compare(SocketAddress socketAddress, SocketAddress socketAddress2) {
            return socketAddress.toString().compareTo(socketAddress2.toString());
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/client/routing/ServerSetRoutingService$ServerSetRoutingServiceBuilder.class */
    static class ServerSetRoutingServiceBuilder implements RoutingService.Builder {
        private ServerSetWatcher serverSetWatcher;

        private ServerSetRoutingServiceBuilder() {
        }

        public ServerSetRoutingServiceBuilder serverSetWatcher(ServerSetWatcher serverSetWatcher) {
            this.serverSetWatcher = serverSetWatcher;
            return this;
        }

        @Override // org.apache.distributedlog.client.routing.RoutingService.Builder
        public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
            return this;
        }

        @Override // org.apache.distributedlog.client.routing.RoutingService.Builder
        public RoutingService build() {
            Preconditions.checkNotNull(this.serverSetWatcher, "No serverset watcher provided.");
            return new ServerSetRoutingService(this.serverSetWatcher);
        }
    }

    static ServerSetRoutingServiceBuilder newServerSetRoutingServiceBuilder() {
        return new ServerSetRoutingServiceBuilder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServerSetRoutingService(ServerSetWatcher serverSetWatcher) {
        super("ServerSetRoutingService");
        this.hostSet = new HashSet();
        this.hostList = new ArrayList();
        this.hasher = Hashing.md5();
        this.serverSetChange = new AtomicReference<>(null);
        this.changeLatch = new CountDownLatch(1);
        this.listeners = new CopyOnWriteArraySet<>();
        this.serverSetWatcher = serverSetWatcher;
    }

    public Set<SocketAddress> getHosts() {
        ImmutableSet copyOf;
        synchronized (this.hostSet) {
            copyOf = ImmutableSet.copyOf(this.hostSet);
        }
        return copyOf;
    }

    public void startService() {
        start();
        try {
            if (!this.changeLatch.await(1L, TimeUnit.MINUTES)) {
                logger.warn("No serverset change received in 1 minute.");
            }
        } catch (InterruptedException e) {
            logger.warn("Interrupted waiting first serverset change : ", e);
        }
        logger.info("{} Routing Service Started.", getClass().getSimpleName());
    }

    public void stopService() {
        Thread.currentThread().interrupt();
        try {
            join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted on waiting serverset routing service to finish : ", e);
        }
        logger.info("{} Routing Service Stopped.", getClass().getSimpleName());
    }

    public RoutingService registerListener(RoutingService.RoutingListener routingListener) {
        this.listeners.add(routingListener);
        return this;
    }

    public RoutingService unregisterListener(RoutingService.RoutingListener routingListener) {
        this.listeners.remove(routingListener);
        return this;
    }

    public SocketAddress getHost(String str, RoutingService.RoutingContext routingContext) throws NoBrokersAvailableException {
        SocketAddress socketAddress = null;
        synchronized (this.hostSet) {
            if (0 != this.hostList.size()) {
                int asInt = this.hasher.hashUnencodedChars(str).asInt();
                int signSafeMod = signSafeMod(asInt, this.hostList.size());
                socketAddress = this.hostList.get(signSafeMod);
                if (routingContext.isTriedHost(socketAddress)) {
                    ArrayList arrayList = new ArrayList(this.hostList);
                    arrayList.remove(signSafeMod);
                    int signSafeMod2 = signSafeMod(asInt, arrayList.size());
                    socketAddress = (SocketAddress) arrayList.get(signSafeMod2);
                    int i = signSafeMod2;
                    while (true) {
                        if (!routingContext.isTriedHost(socketAddress)) {
                            break;
                        }
                        i = (i + 1) % arrayList.size();
                        if (i == signSafeMod2) {
                            socketAddress = null;
                            break;
                        }
                        socketAddress = (SocketAddress) arrayList.get(i);
                    }
                }
            }
        }
        if (null == socketAddress) {
            throw new NoBrokersAvailableException("No host is available.");
        }
        return socketAddress;
    }

    public void removeHost(SocketAddress socketAddress, Throwable th) {
        synchronized (this.hostSet) {
            if (this.hostSet.remove(socketAddress)) {
                logger.info("Node {} left due to : ", socketAddress, th);
            }
            this.hostList = new ArrayList(this.hostSet);
            Collections.sort(this.hostList, HostComparator.INSTANCE);
            logger.info("Host list becomes : {}.", this.hostList);
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.serverSetWatcher.watch(new ServerSetWatcher.ServerSetMonitor() { // from class: org.apache.distributedlog.client.routing.ServerSetRoutingService.1
                @Override // org.apache.distributedlog.client.routing.ServerSetWatcher.ServerSetMonitor
                public void onChange(ImmutableSet<DLSocketAddress> immutableSet) {
                    ImmutableSet<DLSocketAddress> immutableSet2;
                    if (null != ((ImmutableSet) ServerSetRoutingService.this.serverSetChange.getAndSet(immutableSet))) {
                        return;
                    }
                    do {
                        immutableSet2 = (ImmutableSet) ServerSetRoutingService.this.serverSetChange.get();
                        ServerSetRoutingService.this.performServerSetChange(immutableSet2);
                        ServerSetRoutingService.this.changeLatch.countDown();
                    } while (!ServerSetRoutingService.this.serverSetChange.compareAndSet(immutableSet2, null));
                }
            });
        } catch (Exception e) {
            logger.error("Fail to monitor server set : ", e);
            Runtime.getRuntime().exit(-1);
        }
    }

    protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> immutableSet) {
        ImmutableSet<SocketAddress> immutableCopy;
        ImmutableSet<SocketAddress> immutableCopy2;
        HashSet hashSet = new HashSet();
        UnmodifiableIterator it = immutableSet.iterator();
        while (it.hasNext()) {
            hashSet.add(((DLSocketAddress) it.next()).getSocketAddress());
        }
        synchronized (this.hostSet) {
            immutableCopy = Sets.difference(this.hostSet, hashSet).immutableCopy();
            immutableCopy2 = Sets.difference(hashSet, this.hostSet).immutableCopy();
            for (SocketAddress socketAddress : immutableCopy) {
                if (this.hostSet.remove(socketAddress)) {
                    logger.info("Node {} left.", socketAddress);
                }
            }
            for (SocketAddress socketAddress2 : immutableCopy2) {
                if (this.hostSet.add(socketAddress2)) {
                    logger.info("Node {} joined.", socketAddress2);
                }
            }
        }
        for (SocketAddress socketAddress3 : immutableCopy) {
            Iterator<RoutingService.RoutingListener> it2 = this.listeners.iterator();
            while (it2.hasNext()) {
                it2.next().onServerLeft(socketAddress3);
            }
        }
        for (SocketAddress socketAddress4 : immutableCopy2) {
            Iterator<RoutingService.RoutingListener> it3 = this.listeners.iterator();
            while (it3.hasNext()) {
                it3.next().onServerJoin(socketAddress4);
            }
        }
        synchronized (this.hostSet) {
            this.hostList = new ArrayList(this.hostSet);
            Collections.sort(this.hostList, HostComparator.INSTANCE);
            logger.info("Host list becomes : {}.", this.hostList);
        }
    }

    static int signSafeMod(long j, int i) {
        int i2 = (int) (j % i);
        if (i2 < 0) {
            i2 += i;
        }
        return i2;
    }
}
