package ir.msob.jima.cloud.rsocket.servicediscovery.server.controller;

import io.rsocket.RSocket;
import ir.msob.jima.cloud.rsocket.beans.ApplicationCacheService;
import ir.msob.jima.cloud.rsocket.commons.model.InstanceInfo;
import ir.msob.jima.cloud.rsocket.commons.model.ServiceDiscoveryPayload;
import ir.msob.jima.cloud.rsocket.commons.util.RSocketUtil;
import ir.msob.jima.cloud.rsocket.commons.util.RoundRobinList;
import ir.msob.jima.cloud.rsocket.servicediscovery.server.client.ApplicationClient;
import ir.msob.jima.cloud.rsocket.servicediscovery.server.client.ServerClient;
import java.util.Collection;
import java.util.Objects;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Controller;

@MessageMapping({"service-discovery-server"})
@Controller
/* loaded from: input_file:ir/msob/jima/cloud/rsocket/servicediscovery/server/controller/ApplicationController.class */
public class ApplicationController {

    @Generated
    private static final Logger log = LogManager.getLogger(ApplicationController.class);
    private final ApplicationCacheService applicationCacheService;
    private final ApplicationClient applicationClient;
    private final ServerClient serverClient;

    @MessageMapping({"application.setup"})
    public void setup(RSocketRequester rSocketRequester, @Payload ServiceDiscoveryPayload serviceDiscoveryPayload) {
        log.info("{} Setup. {}", "application.setup", serviceDiscoveryPayload);
        serviceDiscoveryPayload.getInstanceInfos().forEach(instanceInfo -> {
            instanceInfo.setRequester(rSocketRequester);
        });
        if (rSocketRequester.rsocket() == null) {
            throw new RuntimeException("Requester socket is null");
        }
        ((RSocket) Objects.requireNonNull(rSocketRequester.rsocket())).onClose().doFirst(() -> {
            log.info("Application connected. {}", serviceDiscoveryPayload);
            connect(rSocketRequester, serviceDiscoveryPayload);
        }).doOnError(th -> {
            log.info("Application connection has an error. payload {}, error {}", serviceDiscoveryPayload, th);
            disconnect(serviceDiscoveryPayload);
        }).doFinally(signalType -> {
            log.info("Application disconnected. payload {}, consumer {}", serviceDiscoveryPayload, signalType);
            disconnect(serviceDiscoveryPayload);
        }).subscribe();
    }

    @MessageMapping({"application.register"})
    public ServiceDiscoveryPayload register(@Payload ServiceDiscoveryPayload serviceDiscoveryPayload) {
        log.info("Received {} request: {}", "application.register", serviceDiscoveryPayload);
        this.applicationCacheService.add(serviceDiscoveryPayload.getInstanceInfos());
        this.applicationClient.register(serviceDiscoveryPayload.getInstanceInfos());
        return serviceDiscoveryPayload;
    }

    @MessageMapping({"application.unregister"})
    public ServiceDiscoveryPayload unregister(@Payload ServiceDiscoveryPayload serviceDiscoveryPayload) {
        log.info("Received {} request: {}", "application.unregister", serviceDiscoveryPayload);
        this.applicationCacheService.remove(serviceDiscoveryPayload.getInstanceInfos());
        this.applicationClient.unregister(serviceDiscoveryPayload.getInstanceInfos());
        return serviceDiscoveryPayload;
    }

    private void connect(RSocketRequester rSocketRequester, ServiceDiscoveryPayload serviceDiscoveryPayload) {
        Collection<InstanceInfo> prepareInstanceInfo = prepareInstanceInfo(serviceDiscoveryPayload, rSocketRequester);
        this.applicationCacheService.add(prepareInstanceInfo);
        this.applicationClient.register(prepareInstanceInfo);
        this.serverClient.registerApplication(prepareInstanceInfo);
        returnApplications(rSocketRequester);
    }

    private void returnApplications(RSocketRequester rSocketRequester) {
        if (this.applicationCacheService.getApplications().isEmpty()) {
            return;
        }
        ServiceDiscoveryPayload serviceDiscoveryPayload = new ServiceDiscoveryPayload();
        serviceDiscoveryPayload.setInstanceInfos(this.applicationCacheService.getApplications().entrySet().stream().flatMap(entry -> {
            return ((RoundRobinList) entry.getValue()).stream();
        }).map((v0) -> {
            return v0.copy();
        }).toList());
        rSocketRequester.route("application.register", new Object[0]).data(serviceDiscoveryPayload).send().subscribe();
    }

    private void disconnect(ServiceDiscoveryPayload serviceDiscoveryPayload) {
        this.applicationCacheService.remove(serviceDiscoveryPayload.getInstanceInfos());
        this.applicationClient.unregister(serviceDiscoveryPayload.getInstanceInfos());
        this.serverClient.unregisterApplication(serviceDiscoveryPayload.getInstanceInfos());
    }

    private Collection<InstanceInfo> prepareInstanceInfo(ServiceDiscoveryPayload serviceDiscoveryPayload, RSocketRequester rSocketRequester) {
        serviceDiscoveryPayload.getInstanceInfos().forEach(instanceInfo -> {
            instanceInfo.setRequester(rSocketRequester);
            instanceInfo.getConnectionInfo().setHost(RSocketUtil.getRemoteHostFromRequester(rSocketRequester));
        });
        return serviceDiscoveryPayload.getInstanceInfos();
    }

    @Generated
    public ApplicationController(ApplicationCacheService applicationCacheService, ApplicationClient applicationClient, ServerClient serverClient) {
        this.applicationCacheService = applicationCacheService;
        this.applicationClient = applicationClient;
        this.serverClient = serverClient;
    }
}
