package ru.stm.rpc.kafkaredis.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.Acknowledgment;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import ru.stm.rpc.core.NoDefRpcCtx;
import ru.stm.rpc.core.RpcCtx;
import ru.stm.rpc.core.RpcHandler;
import ru.stm.rpc.kafkaredis.config.KafkaRpcInternalConstants;
import ru.stm.rpc.kafkaredis.consumer.RpcAbstractRpcListener;
import ru.stm.rpc.kafkaredis.serialize.RpcSerializer;
import ru.stm.rpc.kafkaredis.util.RemoteServiceLogger;
import ru.stm.rpc.kafkaredis.util.RpcDirection;
import ru.stm.rpc.types.RpcRequest;
import ru.stm.rpc.types.RpcResultType;

/* loaded from: input_file:ru/stm/rpc/kafkaredis/service/RpcTopic.class */
public class RpcTopic {
    private static final Logger log = LoggerFactory.getLogger(RpcTopic.class);
    private static final ObjectMapper objectMapper = RpcSerializer.getObjectMapper();
    private final String namespace;
    private final String topic;
    private final boolean transactional;
    private final Map<String, MethodDef> methods = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ru/stm/rpc/kafkaredis/service/RpcTopic$MethodDef.class */
    public static class MethodDef {
        private Method method;
        private Object bean;
        private RemoteServiceLogger logger;
        private RpcHandler handler;
        private boolean hasRequest;
        private boolean hasContext;
        private int requestIndex;
        private int contextIndex;
        private Class<? extends RpcCtx> contextClass;

        private MethodDef() {
        }
    }

    public RpcTopic(String str, String str2, boolean z) {
        this.namespace = str;
        this.topic = str2;
        this.transactional = z;
    }

    public void addListenerBean(Object obj) {
        Class<?> cls = obj.getClass();
        RemoteServiceLogger remoteServiceLogger = new RemoteServiceLogger(cls, this.namespace, this.topic, RpcDirection.CONSUMER);
        for (Method method : cls.getMethods()) {
            RpcHandler annotation = method.getAnnotation(RpcHandler.class);
            if (annotation != null) {
                if (!method.getReturnType().equals(Mono.class)) {
                    throw new IllegalArgumentException("RpcHandler does not return Mono<>, class=" + cls.getName() + ", method=" + method.getName());
                }
                if (!annotation.async()) {
                    log.trace("Processing class {} method {}", cls.getSimpleName(), method.getName());
                    Type type = ((ParameterizedType) method.getGenericReturnType()).getActualTypeArguments()[0];
                    if (type instanceof Class) {
                        Class cls2 = (Class) type;
                        if (!RpcResultType.class.isAssignableFrom(cls2)) {
                            throw new IllegalArgumentException(String.format("RpcHandler shall return Mono<? extends RpcResultType>, class=%s, method=%s, monoType=%s", cls.getName(), method.getName(), cls2.getName()));
                        }
                    }
                }
                MethodDef methodDef = new MethodDef();
                methodDef.method = method;
                methodDef.bean = obj;
                methodDef.logger = remoteServiceLogger;
                methodDef.handler = annotation;
                Class<NoDefRpcCtx>[] parameterTypes = method.getParameterTypes();
                for (int i = 0; i < parameterTypes.length; i++) {
                    Class<NoDefRpcCtx> cls3 = parameterTypes[i];
                    if (RpcCtx.class.isAssignableFrom(cls3)) {
                        methodDef.hasContext = true;
                        methodDef.contextIndex = i;
                        methodDef.contextClass = cls3.equals(RpcCtx.class) ? NoDefRpcCtx.class : cls3;
                    } else if (!methodDef.hasRequest && RpcRequest.class.isAssignableFrom(cls3)) {
                        methodDef.hasRequest = true;
                        methodDef.requestIndex = i;
                    }
                }
                if (!methodDef.hasRequest) {
                    throw new RuntimeException("RpcHandler does not have RpcRequest parameter, class=" + cls.getName() + ", method=" + method.getName());
                }
                MethodDef put = this.methods.put(parameterTypes[methodDef.requestIndex].getName(), methodDef);
                if (put != null) {
                    throw new IllegalStateException(String.format("RPC handler %s#%s conflicts with %s#%s", methodDef.bean.getClass().getSimpleName(), methodDef.method.getName(), put.bean.getClass().getSimpleName(), put.method.getName()));
                }
            }
        }
    }

    public void handle(ConsumerRecord<String, String> consumerRecord, RpcAbstractRpcListener rpcAbstractRpcListener, Acknowledgment acknowledgment) {
        RpcRequest rpcRequest = (RpcRequest) consumerRecord.value();
        MethodDef methodDef = this.methods.get(rpcRequest.getClass().getName());
        if (methodDef == null) {
            throw new UnsupportedOperationException(String.format("Unsupported RPC request class %s, ns=%s, topic=%s", rpcRequest.getClass().getName(), this.namespace, this.topic));
        }
        RpcCtx userContext = methodDef.hasContext ? getUserContext(getSingleHeaderOrNull(consumerRecord, "KAFKA_USER_CONTEXT"), methodDef.contextClass) : null;
        if (methodDef.handler.async()) {
            invokeAsync(methodDef, rpcRequest, userContext, acknowledgment);
        } else {
            invokeSync(methodDef, rpcRequest, userContext, rpcAbstractRpcListener, acknowledgment, consumerRecord);
        }
    }

    private void invokeAsync(MethodDef methodDef, Object obj, RpcCtx rpcCtx, Acknowledgment acknowledgment) {
        invoke(methodDef, obj, rpcCtx).doOnSuccess(obj2 -> {
            acknowledgment.acknowledge();
        }).switchIfEmpty(Mono.defer(() -> {
            acknowledgment.acknowledge();
            return Mono.empty();
        })).doOnError(th -> {
            if (methodDef.handler.ackOnError()) {
                acknowledgment.acknowledge();
            }
        }).subscribe();
    }

    private void invokeSync(MethodDef methodDef, RpcRequest rpcRequest, RpcCtx rpcCtx, RpcAbstractRpcListener rpcAbstractRpcListener, Acknowledgment acknowledgment, ConsumerRecord<String, String> consumerRecord) {
        OffsetDateTime offsetDateTime = null;
        Iterator it = consumerRecord.headers().headers(KafkaRpcInternalConstants.KAFKA_REDIS_RPC_TIMEOUT).iterator();
        if (it.hasNext()) {
            offsetDateTime = OffsetDateTime.parse(new String(((Header) it.next()).value()));
        }
        OffsetDateTime offsetDateTime2 = offsetDateTime;
        String str = (String) consumerRecord.key();
        byte[] singleHeaderOrNull = getSingleHeaderOrNull(consumerRecord, "TRACE_ID");
        String str2 = singleHeaderOrNull == null ? "NO_TRACE_ID" : new String(singleHeaderOrNull);
        rpcAbstractRpcListener.executeAndSend(str, str2, rpcRequest, acknowledgment, () -> {
            return invoke(methodDef, rpcRequest, rpcCtx).subscriberContext(Context.of("OPERATION_ID", str, "TIMEOUT", offsetDateTime2, "TRACE_ID", str2));
        }, offsetDateTime2);
    }

    private <R> Mono<R> invoke(MethodDef methodDef, Object obj, RpcCtx rpcCtx) {
        try {
            Object[] objArr = new Object[methodDef.method.getParameterCount()];
            objArr[methodDef.requestIndex] = obj;
            if (methodDef.hasContext) {
                objArr[methodDef.contextIndex] = rpcCtx;
            }
            long currentTimeMillis = System.currentTimeMillis();
            try {
                Mono mono = (Mono) methodDef.method.invoke(methodDef.bean, objArr);
                return mono == null ? Mono.error(new IllegalStateException("Method returned no mono: " + methodDef.method.getName())) : mono.doOnSuccessOrError((obj2, th) -> {
                    methodDef.logger.log(methodDef.handler.value(), methodDef.handler.type(), methodDef.method.getName(), rpcCtx, obj, obj2, th, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                }).switchIfEmpty(Mono.defer(() -> {
                    methodDef.logger.log(methodDef.handler.value(), methodDef.handler.type(), methodDef.method.getName(), rpcCtx, obj, null, null, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    return Mono.empty();
                }));
            } catch (InvocationTargetException e) {
                methodDef.logger.log(methodDef.handler.value(), methodDef.handler.type(), methodDef.method.getName(), rpcCtx, obj, null, e, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                return Mono.error(e.getCause());
            }
        } catch (Throwable th2) {
            return Mono.error(th2);
        }
    }

    private RpcCtx getUserContext(byte[] bArr, Class<? extends RpcCtx> cls) {
        if (bArr == null) {
            return null;
        }
        try {
            return (RpcCtx) objectMapper.readValue(bArr, cls);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private byte[] getSingleHeaderOrNull(ConsumerRecord consumerRecord, String str) {
        if (consumerRecord.headers().headers(str).iterator().hasNext()) {
            return ((Header) consumerRecord.headers().headers(str).iterator().next()).value();
        }
        return null;
    }

    public String getTopic() {
        return this.topic;
    }

    public boolean isTransactional() {
        return this.transactional;
    }
}
