package ir.msob.jima.cloud.rsocket.servicediscovery.server.controller;

import io.rsocket.RSocket;
import ir.msob.jima.cloud.rsocket.beans.ApplicationCacheService;
import ir.msob.jima.cloud.rsocket.commons.model.ServiceDiscoveryPayload;
import ir.msob.jima.cloud.rsocket.commons.util.RoundRobinList;
import ir.msob.jima.cloud.rsocket.servicediscovery.server.ServerCacheService;
import java.util.Objects;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Controller;

@MessageMapping({"service-discovery-server"})
@Controller
/* loaded from: input_file:ir/msob/jima/cloud/rsocket/servicediscovery/server/controller/ServerController.class */
public class ServerController {

    @Generated
    private static final Logger log = LogManager.getLogger(ServerController.class);
    private final ApplicationCacheService applicationCacheService;
    private final ServerCacheService serverCacheService;

    @MessageMapping({"server.setup"})
    public void setup(RSocketRequester rSocketRequester, @Payload ServiceDiscoveryPayload serviceDiscoveryPayload) {
        log.info("{} Setup. {}", "server.setup", serviceDiscoveryPayload);
        serviceDiscoveryPayload.getInstanceInfos().stream().findFirst().ifPresent(instanceInfo -> {
            instanceInfo.setRequester(rSocketRequester);
        });
        if (rSocketRequester.rsocket() == null) {
            throw new RuntimeException("Requester socket is null");
        }
        ((RSocket) Objects.requireNonNull(rSocketRequester.rsocket())).onClose().doFirst(() -> {
            log.info("Service discovery server connected. {}", serviceDiscoveryPayload);
            connect(rSocketRequester, serviceDiscoveryPayload);
        }).doOnError(th -> {
            log.info("Service discovery server connection has an error. payload {}, error {}", serviceDiscoveryPayload, th);
            disconnect(serviceDiscoveryPayload);
        }).doFinally(signalType -> {
            log.info("Service discovery server disconnected. payload {}, consumer {}", serviceDiscoveryPayload, signalType);
            disconnect(serviceDiscoveryPayload);
        }).subscribe();
    }

    private void disconnect(ServiceDiscoveryPayload serviceDiscoveryPayload) {
        serviceDiscoveryPayload.getInstanceInfos().forEach(instanceInfo -> {
            this.serverCacheService.updateConnection(null, instanceInfo);
        });
    }

    private void connect(RSocketRequester rSocketRequester, ServiceDiscoveryPayload serviceDiscoveryPayload) {
        serviceDiscoveryPayload.getInstanceInfos().forEach(instanceInfo -> {
            this.serverCacheService.updateConnection(rSocketRequester, instanceInfo);
        });
        returnApplications(rSocketRequester);
    }

    private void returnApplications(RSocketRequester rSocketRequester) {
        if (this.applicationCacheService.getApplications().isEmpty()) {
            return;
        }
        ServiceDiscoveryPayload serviceDiscoveryPayload = new ServiceDiscoveryPayload();
        serviceDiscoveryPayload.setInstanceInfos(this.applicationCacheService.getApplications().entrySet().stream().flatMap(entry -> {
            return ((RoundRobinList) entry.getValue()).stream();
        }).map((v0) -> {
            return v0.copy();
        }).toList());
        rSocketRequester.route("service-discovery-server.application.register", new Object[0]).data(serviceDiscoveryPayload).send().subscribe();
    }

    @Generated
    public ServerController(ApplicationCacheService applicationCacheService, ServerCacheService serverCacheService) {
        this.applicationCacheService = applicationCacheService;
        this.serverCacheService = serverCacheService;
    }
}
