package ru.stm.rpc.kafkaredis.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.stm.platform.BusinessError;
import ru.stm.platform.StmExecutionError;
import ru.stm.rpc.core.RpcResult;
import ru.stm.rpc.kafkaredis.config.KafkaRpcConnection;
import ru.stm.rpc.kafkaredis.serialize.RpcSerializer;
import ru.stm.rpc.types.RpcResultType;

/* loaded from: input_file:ru/stm/rpc/kafkaredis/consumer/RpcResponseService.class */
public class RpcResponseService {
    private static final Logger logger = LoggerFactory.getLogger(RpcResponseService.class);
    private final ObjectMapper objectMapper = RpcSerializer.getObjectMapper();
    private final AtomicLong sentResponsesOk = new AtomicLong(0);
    private final AtomicLong sentResponsesFailed = new AtomicLong(0);
    private final AtomicLong sendException = new AtomicLong(0);
    private final Map<String, KafkaRpcConnection> kafkaRpcConnection = new HashMap();
    private final MeterRegistry meterRegistry;

    public RpcResponseService(List<KafkaRpcConnection> list, MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        list.forEach(kafkaRpcConnection -> {
            this.kafkaRpcConnection.put(kafkaRpcConnection.getNamespace(), kafkaRpcConnection);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendExceptionally(Object obj, String str, String str2, String str3, Throwable th, String str4, Duration duration) {
        logger.error("Send error to operationId={} traceId={} operationName={} payload={}", new Object[]{str, str2, str3, obj, th});
        RpcResult errorInternal = RpcResult.errorInternal(str, th);
        try {
            KafkaRpcConnection kafkaRpcConnection = this.kafkaRpcConnection.get(str4);
            kafkaRpcConnection.getRedisTemplate().opsForValue().set(str, getRpcRedisValue(errorInternal, str, str2, str3, kafkaRpcConnection), kafkaRpcConnection.getRedisttl()).timeout(duration).doOnError(th2 -> {
                logger.error("Error send response to redis key={} operation={}", new Object[]{str, str3, th2});
                this.meterRegistry.counter("rpc_send_exceptionally", new String[]{"RPC", "Consumer", "outcome", "error", "namespace", str4, "operation_name", str3, "component", "redis"}).increment();
                this.sentResponsesFailed.incrementAndGet();
            }).subscribe(bool -> {
                this.sendException.incrementAndGet();
                this.meterRegistry.counter("rpc_send_exceptionally", new String[]{"RPC", "Consumer", "outcome", "success", "namespace", str4, "operation_name", str3, "component", "response"}).increment();
            });
        } catch (JsonProcessingException e) {
            this.meterRegistry.counter("rpc_send_exceptionally", new String[]{"RPC", "Consumer", "outcome", "error", "namespace", str4, "operation_name", str3, "component", "json"}).increment();
            logger.error("failed to send error response to redis {}", str, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T extends RpcResultType> void sendSuccess(Object obj, String str, String str2, String str3, T t, String str4, Duration duration) {
        try {
            RpcResult success = RpcResult.success(str, t);
            KafkaRpcConnection kafkaRpcConnection = this.kafkaRpcConnection.get(str4);
            kafkaRpcConnection.getRedisTemplate().opsForValue().set(str, getRpcRedisValue(success, str, str2, str3, kafkaRpcConnection), kafkaRpcConnection.getRedisttl()).retryBackoff(kafkaRpcConnection.getProps().getConsumer().getRetryTimes(), kafkaRpcConnection.getConsumerBackoff()).timeout(duration).doOnError(th -> {
                this.meterRegistry.counter("rpc_send_success", new String[]{"RPC", "Consumer", "outcome", "error", "namespace", str4, "operation_name", str3, "component", "redis"}).increment();
                logger.error("Error send response to redis key={} operation={}", new Object[]{str, str3, th});
                this.sentResponsesFailed.incrementAndGet();
            }).subscribe(bool -> {
                if (!(bool instanceof Boolean) || !bool.booleanValue()) {
                    this.meterRegistry.counter("rpc_send_success", new String[]{"RPC", "Consumer", "outcome", "error", "namespace", str4, "operation_name", str3, "component", "redis"}).increment();
                    throw new RuntimeException("Can not send to redis key " + str);
                }
                this.meterRegistry.counter("rpc_send_success", new String[]{"RPC", "Consumer", "outcome", "success", "namespace", str4, "operation_name", str3, "component", "response"}).increment();
                this.sentResponsesOk.incrementAndGet();
                logger.trace("sent to redis key={} operation={}", str, str3);
            });
        } catch (Exception e) {
            sendExceptionally(obj, str, str2, str3, e, str4, duration);
            logger.error("Error serialize to redis request {}", str, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendErrorBusiness(Object obj, String str, String str2, String str3, BusinessError businessError, String str4, Duration duration) {
        try {
            RpcResult errorBusinessString = RpcResult.errorBusinessString(str, businessError.getCode(), businessError.getMessage(), businessError.getArgs());
            KafkaRpcConnection kafkaRpcConnection = this.kafkaRpcConnection.get(str4);
            kafkaRpcConnection.getRedisTemplate().opsForValue().set(str, getRpcRedisValue(errorBusinessString, str, str2, str3, kafkaRpcConnection), kafkaRpcConnection.getRedisttl()).retryBackoff(kafkaRpcConnection.getProps().getConsumer().getRetryTimes(), kafkaRpcConnection.getConsumerBackoff()).timeout(duration).doOnError(th -> {
                this.meterRegistry.counter("rpc_send_error_business", new String[]{"RPC", "Consumer", "outcome", "error", "namespace", str4, "operation_name", str3, "component", "redis"}).increment();
                logger.error("Error send response to redis key={} operation={}", new Object[]{str, str3, th});
                this.sentResponsesFailed.incrementAndGet();
            }).subscribe(bool -> {
                this.meterRegistry.counter("rpc_send_error_business", new String[]{"RPC", "Consumer", "outcome", "success", "namespace", str4, "operation_name", str3, "component", "response"}).increment();
                this.sentResponsesOk.incrementAndGet();
                logger.trace("sent to redis {} code {}", str, bool);
            });
        } catch (JsonProcessingException e) {
            logger.error("Error serialize to redis request key={}", str, e);
            sendExceptionally(obj, str, str2, str3, e, str4, duration);
        }
    }

    private String getRpcRedisValue(RpcResult rpcResult, String str, String str2, String str3, KafkaRpcConnection kafkaRpcConnection) throws JsonProcessingException {
        String writeValueAsString = this.objectMapper.writeValueAsString(rpcResult);
        if (writeValueAsString.length() > kafkaRpcConnection.getProps().getResponseWarnThreshold()) {
            int printLength = kafkaRpcConnection.getProps().getPrintLength();
            if (writeValueAsString.length() < printLength) {
                printLength = writeValueAsString.length() - 1;
            }
            if (writeValueAsString.length() > kafkaRpcConnection.getProps().getResponseRefuseThreshold()) {
                logger.error("Too much value in Redis: key={} traceId={} operationName={} length={} response={}", new Object[]{str, str2, str3, Integer.valueOf(writeValueAsString.length()), writeValueAsString.subSequence(0, printLength)});
                throw new StmExecutionError(String.format("RPC Response is too large key=%s traceId=%s operationName==%s result length=%s", str, str2, str3, Integer.valueOf(writeValueAsString.length())));
            }
            logger.warn("RPC large enough value in Redis: key={} traceId={} operationName={} response={}", new Object[]{str, str2, str3, writeValueAsString.subSequence(0, printLength)});
        }
        return writeValueAsString;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AtomicLong getSentResponsesOk() {
        return this.sentResponsesOk;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AtomicLong getSentResponsesFailed() {
        return this.sentResponsesFailed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AtomicLong getSentException() {
        return this.sendException;
    }
}
