package org.kinotic.continuum.internal.core.api.service.rpc;

import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.kinotic.continuum.api.exceptions.RpcMissingServiceException;
import org.kinotic.continuum.core.api.RpcServiceProxy;
import org.kinotic.continuum.core.api.RpcServiceProxyHandle;
import org.kinotic.continuum.core.api.event.CRI;
import org.kinotic.continuum.core.api.event.Event;
import org.kinotic.continuum.core.api.event.EventBusService;
import org.kinotic.continuum.core.api.event.Metadata;
import org.kinotic.continuum.core.api.service.ServiceIdentifier;
import org.kinotic.continuum.internal.utils.ContinuumUtil;
import org.kinotic.continuum.internal.utils.MetaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/kinotic/continuum/internal/core/api/service/rpc/DefaultRpcServiceProxyHandle.class */
public class DefaultRpcServiceProxyHandle<T> implements RpcServiceProxyHandle<T>, InvocationHandler {
    private static final Logger log = LoggerFactory.getLogger(DefaultRpcServiceProxyHandle.class);
    private final ServiceIdentifier serviceIdentifier;
    private final String nodeName;
    private final String encodedNodeName;
    private final Class<T> serviceClass;
    private final CRI handlerCRI;
    private final RpcArgumentConverter rpcArgumentConverter;
    private final RpcReturnValueHandlerFactory rpcReturnValueHandlerFactory;
    private final EventBusService eventBusService;
    private final Disposable replyEventListenerDisposable;
    private final T serviceProxy;
    private final Map<Method, Integer> methodsWithScopeAnnotation = new HashMap();
    private final AtomicBoolean released = new AtomicBoolean(false);
    private final ConcurrentHashMap<String, RpcReturnValueHandler> responseMap = new ConcurrentHashMap<>();

    public DefaultRpcServiceProxyHandle(ServiceIdentifier serviceIdentifier, String str, Class<T> cls, RpcArgumentConverter rpcArgumentConverter, RpcReturnValueHandlerFactory rpcReturnValueHandlerFactory, EventBusService eventBusService, ClassLoader classLoader) {
        Validate.notNull(serviceIdentifier, "serviceIdentifier must not be null", new Object[0]);
        Validate.notBlank(str, "nodeName must not be blank", new Object[0]);
        Validate.notNull(cls, "serviceClass must not be null", new Object[0]);
        Validate.notNull(rpcArgumentConverter, "argumentConverter must not be null", new Object[0]);
        Validate.notNull(rpcReturnValueHandlerFactory, "returnValueHandlerFactory must not be null", new Object[0]);
        Validate.notNull(eventBusService, "eventBusService must not be null", new Object[0]);
        Validate.notNull(classLoader, "classLoader must not be null", new Object[0]);
        this.serviceIdentifier = serviceIdentifier;
        this.nodeName = str;
        this.encodedNodeName = ContinuumUtil.safeEncodeURI(str);
        this.serviceClass = cls;
        this.rpcArgumentConverter = rpcArgumentConverter;
        this.rpcReturnValueHandlerFactory = rpcReturnValueHandlerFactory;
        this.eventBusService = eventBusService;
        this.handlerCRI = CRI.create("srv", this.encodedNodeName + ":" + String.valueOf(UUID.randomUUID()), ContinuumUtil.safeEncodeURI(cls.getName()) + "RpcProxyResponseHandler");
        ReflectionUtils.doWithMethods(cls, method -> {
            if (!rpcReturnValueHandlerFactory.supports(method)) {
                throw new IllegalArgumentException("The method: " + String.valueOf(method) + " does not have a supported RpcReturnValueHandlerFactory");
            }
            Integer findParameterIndexWithScopeAnnotation = MetaUtil.findParameterIndexWithScopeAnnotation(method);
            if (findParameterIndexWithScopeAnnotation != null) {
                this.methodsWithScopeAnnotation.put(method, findParameterIndexWithScopeAnnotation);
            }
        }, ReflectionUtils.USER_DECLARED_METHODS);
        this.serviceProxy = cls.cast(Proxy.newProxyInstance(classLoader, new Class[]{cls, RpcServiceProxy.class}, this));
        this.replyEventListenerDisposable = eventBusService.listen(this.handlerCRI.raw()).subscribe(event -> {
            String str2 = event.metadata().get("__correlation-id");
            if (str2 == null) {
                log.error("Received Message with no __correlation-id header");
                return;
            }
            if (!this.responseMap.containsKey(str2)) {
                log.error("Received Message for correlationId: " + str2 + " but no response handler is set");
                return;
            }
            try {
                if (this.responseMap.get(str2).processResponse(event)) {
                    this.responseMap.remove(str2);
                }
            } catch (Exception e) {
                log.error("URGENT: Unhandled exception in RpcReturnValueHandler.processResponse, Proxy Will be Released!!", e);
                release();
            }
        }, th -> {
            log.error("Reply Event listener error", th);
        }, () -> {
            log.error("Should not happen! Reply Event listener stopped for some reason!!");
        });
    }

    public T getService() {
        return this.serviceProxy;
    }

    public void release() {
        if (this.released.compareAndSet(false, true)) {
            this.replyEventListenerDisposable.dispose();
            this.responseMap.forEach((str, rpcReturnValueHandler) -> {
                rpcReturnValueHandler.cancel(this.serviceClass.getSimpleName() + " released. No further responses will be processed");
            });
            this.responseMap.clear();
        }
    }

    @Override // java.lang.reflect.InvocationHandler
    public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
        Object invoke;
        if (this.released.get()) {
            throw new IllegalStateException("RpcServiceProxyHandle has already been released. No service method can be called after release.");
        }
        if (log.isTraceEnabled()) {
            log.trace("Proxy for " + this.serviceClass.getSimpleName() + " Method Invoked " + method.toString());
        }
        if (shouldInvokeLocally(method)) {
            Class[] clsArr = new Class[objArr.length];
            for (int i = 0; i < objArr.length; i++) {
                clsArr[i] = objArr[i].getClass();
            }
            Method findMethod = ReflectionUtils.findMethod(getClass(), method.getName(), clsArr);
            Assert.notNull(findMethod, "Could not find appropriate method on proxy handle");
            invoke = findMethod.invoke(this, objArr);
        } else {
            Integer num = this.methodsWithScopeAnnotation.get(method);
            String obj2 = num != null ? objArr[num.intValue()].toString() : null;
            if (num != null) {
                objArr = ArrayUtils.remove(objArr, num.intValue());
            }
            byte[] convert = this.rpcArgumentConverter.convert(method, objArr);
            final String uuid = UUID.randomUUID().toString();
            final RpcReturnValueHandler createReturnValueHandler = this.rpcReturnValueHandlerFactory.createReturnValueHandler(method, objArr);
            this.responseMap.put(uuid, createReturnValueHandler);
            Metadata create = Metadata.create();
            create.put("sender", "{\"tenantId\":\"continuum\",\"id\":\"" + this.nodeName + "\",\"metadata\":{\"type\":\"node\"},\"roles\":[\"NODE\"]}");
            create.put("reply-to", this.handlerCRI.raw());
            create.put("__correlation-id", uuid);
            create.put("content-type", this.rpcArgumentConverter.producesContentType());
            final CRI create2 = CRI.create("srv", obj2, this.serviceIdentifier.qualifiedName(), "/" + method.getName(), this.serviceIdentifier.version());
            final Event create3 = Event.create(create2, create, convert);
            invoke = createReturnValueHandler.getReturnValue(new RpcRequest() { // from class: org.kinotic.continuum.internal.core.api.service.rpc.DefaultRpcServiceProxyHandle.1
                @Override // org.kinotic.continuum.internal.core.api.service.rpc.RpcRequest
                public void send() {
                    Mono sendWithAck = DefaultRpcServiceProxyHandle.this.eventBusService.sendWithAck(create3);
                    Consumer consumer = r1 -> {
                    };
                    String str = uuid;
                    RpcReturnValueHandler rpcReturnValueHandler = createReturnValueHandler;
                    sendWithAck.subscribe(consumer, th -> {
                        try {
                            DefaultRpcServiceProxyHandle.this.responseMap.remove(str);
                            if ((th instanceof ReplyException) && ((ReplyException) th).failureType() == ReplyFailure.NO_HANDLERS) {
                                th = new RpcMissingServiceException(th);
                            }
                            rpcReturnValueHandler.processError(th);
                        } catch (Exception e) {
                            DefaultRpcServiceProxyHandle.log.error("URGENT: Unhandled exception in RpcReturnValueHandler.processError, Proxy Will be Released!!", e);
                            DefaultRpcServiceProxyHandle.this.release();
                        }
                    });
                }

                @Override // org.kinotic.continuum.internal.core.api.service.rpc.RpcRequest
                public void cancelRequest() {
                    if (!createReturnValueHandler.isMultiValue()) {
                        throw new IllegalStateException("Cancel is not supported if RpcReturnValueHandler.isMultiValue returns false");
                    }
                    Metadata create4 = Metadata.create();
                    create4.put("control", "cancel");
                    create4.put("__correlation-id", uuid);
                    Mono sendWithAck = DefaultRpcServiceProxyHandle.this.eventBusService.sendWithAck(Event.create(create2, create4, (Object) null));
                    String str = uuid;
                    sendWithAck.doFinally(signalType -> {
                        DefaultRpcServiceProxyHandle.this.responseMap.remove(str);
                    }).subscribe();
                }
            });
        }
        return invoke;
    }

    private boolean shouldInvokeLocally(Method method) {
        boolean z = false;
        if (method.getName().equals("toString")) {
            z = true;
        }
        return z;
    }

    public String toString() {
        return new ToStringBuilder(this).append("serviceIdentifier", this.serviceIdentifier).append("handlerCRI", this.handlerCRI).toString();
    }
}
