package com.cloudimpl.cluster4j.lb;

import com.cloudimpl.cluster4j.common.CloudMessage;
import com.cloudimpl.cluster4j.core.CloudService;
import com.cloudimpl.cluster4j.core.lb.LBRequest;
import com.cloudimpl.cluster4j.core.lb.LBResponse;
import com.cloudimpl.cluster4j.coreImpl.CloudServiceRegistry;
import com.cloudimpl.cluster4j.coreImpl.FluxStream;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.Function;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/cloudimpl/cluster4j/lb/TopicLoadBalancer.class */
public class TopicLoadBalancer implements Function<CloudMessage, Mono<LBResponse>> {
    private final Map<String, TopicHandler> handlers = new ConcurrentHashMap();
    private final CloudServiceRegistry reg;

    /* loaded from: input_file:com/cloudimpl/cluster4j/lb/TopicLoadBalancer$Bucket.class */
    public static final class Bucket {
        private final Set<String> keySet = new ConcurrentSkipListSet();
        private final String id;

        public Bucket(String str) {
            this.id = str;
        }

        public void add(String str) {
            this.keySet.add(str);
        }

        public boolean hasKey(String str) {
            return this.keySet.contains(str);
        }

        public int size() {
            return this.keySet.size();
        }

        public String getId() {
            return this.id;
        }
    }

    /* loaded from: input_file:com/cloudimpl/cluster4j/lb/TopicLoadBalancer$TopicHandler.class */
    public static final class TopicHandler {
        private Map<String, Bucket> mapBuckets = new ConcurrentHashMap();
        private final String name;

        public TopicHandler(String str, CloudServiceRegistry cloudServiceRegistry) {
            this.name = str;
            cloudServiceRegistry.flux().filter(event -> {
                return ((CloudService) event.getValue()).name().equals(str);
            }).filter(event2 -> {
                return event2.getType() == FluxStream.Event.Type.ADD || event2.getType() == FluxStream.Event.Type.UPDATE;
            }).map(event3 -> {
                return (CloudService) event3.getValue();
            }).doOnNext(cloudService -> {
                this.mapBuckets.computeIfAbsent(cloudService.id(), str2 -> {
                    return new Bucket(str2);
                });
            }).subscribe();
            cloudServiceRegistry.flux().filter(event4 -> {
                return ((CloudService) event4.getValue()).name().equals(str);
            }).filter(event5 -> {
                return event5.getType() == FluxStream.Event.Type.REMOVE;
            }).map(event6 -> {
                return (CloudService) event6.getValue();
            }).doOnNext(cloudService2 -> {
                this.mapBuckets.remove(cloudService2.id());
            }).subscribe();
        }

        private Optional<Bucket> search(String str) {
            return this.mapBuckets.values().stream().filter(bucket -> {
                return bucket.hasKey(str);
            }).findFirst();
        }

        public String assign(String str) {
            if (str == null) {
                throw new TopicLoadBalancerException("key is null");
            }
            Optional<Bucket> search = search(str);
            if (search.isPresent()) {
                return search.get().getId();
            }
            Optional<Bucket> min = this.mapBuckets.values().stream().min(Comparator.comparing((v0) -> {
                return v0.size();
            }));
            if (!min.isPresent()) {
                throw new TopicLoadBalancerException("no service found to loadbalance for topic " + this.name);
            }
            min.get().add(str);
            return min.get().getId();
        }
    }

    public TopicLoadBalancer(CloudServiceRegistry cloudServiceRegistry) {
        this.reg = cloudServiceRegistry;
    }

    @Override // java.util.function.Function
    public Mono<LBResponse> apply(CloudMessage cloudMessage) {
        LBRequest lBRequest = (LBRequest) cloudMessage.data();
        return hnd(lBRequest.getTopic()).map(topicHandler -> {
            return topicHandler.assign(lBRequest.getKey());
        }).map(str -> {
            return new LBResponse(lBRequest.getTopic(), str, lBRequest.getKey());
        });
    }

    private Mono<TopicHandler> hnd(String str) {
        return Mono.just(this.handlers.computeIfAbsent(str, str2 -> {
            return new TopicHandler(str2, this.reg);
        }));
    }
}
