package io.rsocket.routing.broker.spring.cluster;

import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.routing.broker.RoutingTable;
import io.rsocket.routing.broker.spring.BrokerProperties;
import io.rsocket.routing.frames.BrokerInfo;
import io.rsocket.routing.frames.RouteJoin;
import io.rsocket.routing.frames.RoutingFrame;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;

@Controller
/* loaded from: input_file:io/rsocket/routing/broker/spring/cluster/ClusterController.class */
public class ClusterController {
    private final BrokerProperties properties;
    private final BrokerConnections brokerConnections;
    private final RoutingTable routingTable;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final FluxProcessor<BrokerInfo, BrokerInfo> connectEvents = DirectProcessor.create().serialize();

    public ClusterController(BrokerProperties brokerProperties, BrokerConnections brokerConnections, RoutingTable routingTable) {
        this.properties = brokerProperties;
        this.brokerConnections = brokerConnections;
        this.routingTable = routingTable;
        this.connectEvents.delayElements(Duration.ofSeconds(1L)).flatMap(brokerInfo -> {
            return sendBrokerInfo(brokerConnections.get(brokerInfo), brokerInfo);
        }).subscribe();
    }

    @ConnectMapping
    public Mono<Void> onConnect(RoutingFrame routingFrame, RSocketRequester rSocketRequester) {
        if (!(routingFrame instanceof BrokerInfo)) {
            return Mono.empty();
        }
        BrokerInfo brokerInfo = (BrokerInfo) routingFrame;
        if (brokerInfo.getBrokerId().equals(this.properties.getBrokerId())) {
            return Mono.empty();
        }
        this.logger.info("received connection from {}", brokerInfo);
        if (this.brokerConnections.contains(brokerInfo)) {
            return Mono.error(new RejectedSetupException("Duplicate connection from " + brokerInfo));
        }
        this.brokerConnections.put(brokerInfo, rSocketRequester);
        this.connectEvents.onNext(brokerInfo);
        return Mono.empty();
    }

    @MessageMapping({"cluster.broker-info"})
    public Mono<BrokerInfo> brokerInfo(BrokerInfo brokerInfo, RSocketRequester rSocketRequester) {
        this.logger.info("received brokerInfo from {}", brokerInfo);
        if (this.brokerConnections.contains(brokerInfo)) {
            this.logger.debug("connection for broker already exists {}", brokerInfo);
            return Mono.just(BrokerInfo.from(this.properties.getBrokerId()).build());
        }
        this.brokerConnections.put(brokerInfo, rSocketRequester);
        return sendBrokerInfo(rSocketRequester, brokerInfo);
    }

    private Mono<BrokerInfo> sendBrokerInfo(RSocketRequester rSocketRequester, BrokerInfo brokerInfo) {
        BrokerInfo build = BrokerInfo.from(this.properties.getBrokerId()).build();
        return rSocketRequester.route("cluster.broker-info", new Object[0]).data(build).retrieveMono(BrokerInfo.class).map(brokerInfo2 -> {
            return build;
        });
    }

    @MessageMapping({"cluster.route-join"})
    private Mono<RouteJoin> routeJoin(RouteJoin routeJoin) {
        this.logger.info("received RouteJoin {}", routeJoin);
        BrokerInfo build = BrokerInfo.from(routeJoin.getBrokerId()).build();
        if (!this.brokerConnections.contains(build)) {
            return Mono.error(new ApplicationErrorException("No connection for broker " + build));
        }
        this.routingTable.add(routeJoin);
        return Mono.just(routeJoin);
    }

    @MessageMapping({"hello"})
    public Mono<String> hello(String str) {
        return Mono.just("Hello " + str);
    }
}
