package com.cloudimpl.cluster4j.routers;

import com.cloudimpl.cluster4j.common.CloudMessage;
import com.cloudimpl.cluster4j.core.CloudRouter;
import com.cloudimpl.cluster4j.core.CloudService;
import com.cloudimpl.cluster4j.core.CloudServiceRegistry;
import com.cloudimpl.cluster4j.core.FluxStream;
import com.cloudimpl.cluster4j.core.Inject;
import com.cloudimpl.cluster4j.core.Named;
import com.cloudimpl.cluster4j.core.RouterException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/cloudimpl/cluster4j/routers/RoundRobinRouter.class */
public class RoundRobinRouter implements CloudRouter {
    private Set<CloudService> services = new ConcurrentSkipListSet();
    private Iterator<CloudService> iterator;
    private String topic;

    @Inject
    public RoundRobinRouter(@Named("@topic") String str, CloudServiceRegistry cloudServiceRegistry) {
        this.topic = str;
        cloudServiceRegistry.flux().filter(event -> {
            return event.getType() == FluxStream.Event.Type.ADD;
        }).map(event2 -> {
            return (CloudService) event2.getValue();
        }).filter(cloudService -> {
            return cloudService.name().equals(str);
        }).doOnNext(cloudService2 -> {
            this.services.add(cloudService2);
        }).subscribe();
        cloudServiceRegistry.flux().filter(event3 -> {
            return event3.getType() == FluxStream.Event.Type.REMOVE;
        }).map(event4 -> {
            return (CloudService) event4.getValue();
        }).filter(cloudService3 -> {
            return cloudService3.name().equals(str);
        }).doOnNext(cloudService4 -> {
            this.services.remove(cloudService4);
        }).subscribe();
        this.iterator = this.services.iterator();
    }

    public Mono<CloudService> route(CloudMessage cloudMessage) {
        if (!this.iterator.hasNext()) {
            this.iterator = this.services.iterator();
        }
        return this.iterator.hasNext() ? Mono.just(this.iterator.next()) : Mono.error(new RouterException("service [" + this.topic + "] not found"));
    }
}
