package io.rsocket.routing.broker.locator;

import io.netty.util.concurrent.FastThreadLocal;
import io.rsocket.RSocket;
import io.rsocket.routing.broker.RSocketIndex;
import io.rsocket.routing.broker.RoutingTable;
import io.rsocket.routing.broker.loadbalance.LoadBalancer;
import io.rsocket.routing.broker.rsocket.ConnectingRSocket;
import io.rsocket.routing.common.Id;
import io.rsocket.routing.common.Tags;
import io.rsocket.routing.frames.BrokerInfo;
import io.rsocket.routing.frames.RouteJoin;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/routing/broker/locator/RemoteRSocketLocator.class */
public class RemoteRSocketLocator implements RSocketLocator {
    private static final Logger logger = LoggerFactory.getLogger(RemoteRSocketLocator.class);
    private static final FastThreadLocal<List<RSocket>> MEMBERS = new FastThreadLocal<List<RSocket>>() { // from class: io.rsocket.routing.broker.locator.RemoteRSocketLocator.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: initialValue, reason: merged with bridge method [inline-methods] */
        public List<RSocket> m7initialValue() {
            return new ArrayList();
        }
    };
    private static final FastThreadLocal<Set<Id>> FOUND = new FastThreadLocal<Set<Id>>() { // from class: io.rsocket.routing.broker.locator.RemoteRSocketLocator.2
        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: initialValue, reason: merged with bridge method [inline-methods] */
        public Set<Id> m8initialValue() {
            return new HashSet();
        }
    };
    private final Id brokerId;
    private final RoutingTable routingTable;
    private final RSocketIndex rSocketIndex;
    private final LoadBalancer.Factory loadBalancerFactory;
    private final Function<BrokerInfo, RSocket> brokerInfoRSocketFunction;

    public RemoteRSocketLocator(Id id, RoutingTable routingTable, RSocketIndex rSocketIndex, LoadBalancer.Factory factory, Function<BrokerInfo, RSocket> function) {
        this.brokerId = id;
        this.routingTable = routingTable;
        this.rSocketIndex = rSocketIndex;
        this.loadBalancerFactory = factory;
        this.brokerInfoRSocketFunction = function;
    }

    private List<RSocket> members(Tags tags) {
        if (tags == null || tags.isEmpty()) {
            throw new IllegalArgumentException("tags may not be empty");
        }
        List<RSocket> list = (List) MEMBERS.get();
        list.clear();
        List<RSocket> query = this.rSocketIndex.query(tags);
        if (query != null && !query.isEmpty()) {
            list.addAll(query);
        }
        Set set = (Set) FOUND.get();
        set.clear();
        Iterator<RouteJoin> it = this.routingTable.find(tags).iterator();
        while (it.hasNext()) {
            Id brokerId = it.next().getBrokerId();
            if (!Objects.equals(this.brokerId, brokerId) && !set.contains(brokerId)) {
                set.add(brokerId);
                list.add(this.brokerInfoRSocketFunction.apply(BrokerInfo.from(brokerId).build()));
            }
        }
        return list;
    }

    @Override // java.util.function.Function
    public Mono<RSocket> apply(Tags tags) {
        List<RSocket> members = members(tags);
        return members.isEmpty() ? Mono.just(connectingRSocket(tags)) : loadbalance(members, tags);
    }

    private ConnectingRSocket connectingRSocket(Tags tags) {
        return new ConnectingRSocket(this.routingTable.joinEvents(tags).next().flatMap(routeJoin -> {
            List<RSocket> members = members(tags);
            if (logger.isWarnEnabled() && (members == null || members.isEmpty())) {
                logger.warn("Unable to locate RSockets for tags {}", tags);
            }
            return loadbalance(members, tags);
        }));
    }

    private Mono<RSocket> loadbalance(List<RSocket> list, Tags tags) {
        return this.loadBalancerFactory.getInstance(tags).choose(list).map(response -> {
            if (!response.hasRSocket()) {
                response.onComplete(new LoadBalancer.CompletionContext(LoadBalancer.CompletionContext.Status.DISCARD));
                return new RSocket() { // from class: io.rsocket.routing.broker.locator.RemoteRSocketLocator.3
                };
            }
            RSocket rSocket = response.getRSocket();
            response.onComplete(new LoadBalancer.CompletionContext(LoadBalancer.CompletionContext.Status.SUCCESSS));
            return rSocket;
        });
    }
}
