package ru.stm.rpc.kafkaredis.producer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFutureCallback;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;
import ru.stm.platform.StmExecutionError;
import ru.stm.rpc.core.RpcCtx;
import ru.stm.rpc.core.RpcForwardingException;
import ru.stm.rpc.core.RpcResult;
import ru.stm.rpc.core.RpcTimeoutException;
import ru.stm.rpc.kafkaredis.config.KafkaRedisRpcProperties;
import ru.stm.rpc.kafkaredis.config.KafkaRpcConnection;
import ru.stm.rpc.kafkaredis.config.KafkaRpcInternalConstants;
import ru.stm.rpc.kafkaredis.serialize.RpcSerializer;
import ru.stm.rpc.kafkaredis.topic.InternalKafkaRpcConstants;
import ru.stm.rpc.services.RpcServiceRoute;
import ru.stm.rpc.types.RpcRequest;
import ru.stm.rpc.types.RpcResultType;

/* loaded from: input_file:ru/stm/rpc/kafkaredis/producer/KafkaRedisRPCProducer.class */
public class KafkaRedisRPCProducer extends AbstractHealthIndicator implements RpcServiceRoute<RpcCtx>, ApplicationListener<ContextRefreshedEvent> {
    private static final Logger log = LoggerFactory.getLogger(KafkaRedisRPCProducer.class);
    private static final String RPC_REDIS_POLLING = "RPC-Redis-Polling-";
    private final KafkaTemplate kafkaTemplate;
    private final ReactiveRedisTemplate<String, ?> reactiveRedisTemplate;
    private final KafkaRedisRpcProperties.KafkaRedisRpcItem props;
    private final Long redisPollingInterval;
    private final Long redisPollingIdleInterval;
    private final long rpcTimeout;
    private final MeterRegistry meterRegistry;
    private volatile Thread redisThread;
    private final String namespace;
    private final Duration deleteOperationsTimeout;
    private final Duration pollingOperationsTimeout;
    private final int timeoutFailedThreshold;
    private final ObjectMapper objectMapper = RpcSerializer.getObjectMapper();
    private final ForkJoinPool rpcResultExecutorPool = new ForkJoinPool();
    private final Map<String, RpcState> currentRpc = new ConcurrentHashMap();
    private volatile Map<RpcStatus, AtomicLong> metricsCurrent = new ConcurrentHashMap();
    private volatile Map<RpcStatus, AtomicLong> metricsOverall = new ConcurrentHashMap();
    private volatile boolean inFailedByTimeoutState = false;
    private volatile long prevFailedStats = 0;
    private final Set<String> finalizerRedisDelete = new CopyOnWriteArraySet();

    public KafkaRedisRPCProducer(KafkaRpcConnection kafkaRpcConnection, MeterRegistry meterRegistry) {
        Assert.notNull(kafkaRpcConnection.getNamespace(), "Null namespace");
        this.kafkaTemplate = kafkaRpcConnection.getKafkaTemplate();
        this.reactiveRedisTemplate = kafkaRpcConnection.getRedisTemplate();
        this.props = kafkaRpcConnection.getProps();
        this.namespace = kafkaRpcConnection.getNamespace();
        this.rpcTimeout = kafkaRpcConnection.getProps().getProducer().getTimeout();
        this.redisPollingInterval = Long.valueOf(kafkaRpcConnection.getProps().getProducer().getRedisPolling());
        this.redisPollingIdleInterval = Long.valueOf(kafkaRpcConnection.getProps().getProducer().getRedisPolling());
        this.deleteOperationsTimeout = Duration.of(kafkaRpcConnection.getProps().getProducer().getInternalDeleteOperationTimeout(), ChronoUnit.SECONDS);
        this.pollingOperationsTimeout = Duration.of(kafkaRpcConnection.getProps().getProducer().getPollingOperationTimeout(), ChronoUnit.SECONDS);
        this.meterRegistry = meterRegistry;
        initMetricMap(this.metricsOverall);
        this.timeoutFailedThreshold = this.props.getProducer().getTimeoutFailedThreshold();
        startFinalizerThread();
        startRedisPollingThread();
    }

    private void initMetricMap(Map<RpcStatus, AtomicLong> map) {
        map.put(RpcStatus.CREATED, new AtomicLong(0L));
        map.put(RpcStatus.DONE_FAILED, new AtomicLong(0L));
        map.put(RpcStatus.DONE_OK, new AtomicLong(0L));
        map.put(RpcStatus.FAILED_TIMEOUT, new AtomicLong(0L));
        map.put(RpcStatus.GET_FROM_REDIS, new AtomicLong(0L));
        map.put(RpcStatus.FAILED_TO_KAFKA, new AtomicLong(0L));
        map.put(RpcStatus.PROCESSING, new AtomicLong(0L));
        map.put(RpcStatus.SENT_TO_KAFKA, new AtomicLong(0L));
        map.put(RpcStatus.TO_PROCESS_QUEUE, new AtomicLong(0L));
    }

    private boolean isTerminateStatus(RpcStatus rpcStatus) {
        return RpcStatus.DONE_OK.equals(rpcStatus) || RpcStatus.FAILED_TIMEOUT.equals(rpcStatus) || RpcStatus.FAILED_TO_KAFKA.equals(rpcStatus) || RpcStatus.DONE_FAILED.equals(rpcStatus);
    }

    private String mapRpcStatusToMetricName(RpcStatus rpcStatus) {
        return "rpc." + rpcStatus.name().toLowerCase().replace("_", ".");
    }

    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        startStatsThread();
    }

    private void startFinalizerThread() {
        long cleanerFinalizerInterval = this.props.getProducer().getCleanerFinalizerInterval();
        Thread thread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                    initMetricMap(concurrentHashMap);
                    List synchronizedList = Collections.synchronizedList(new ArrayList(this.currentRpc.size() / 2));
                    OffsetDateTime minusSeconds = OffsetDateTime.now().minusSeconds(this.props.getProducer().getFinalizerStaleSec());
                    this.currentRpc.entrySet().parallelStream().forEach(entry -> {
                        String str = (String) entry.getKey();
                        RpcStatus status = ((RpcState) entry.getValue()).getStatus();
                        ((AtomicLong) concurrentHashMap.get(status)).incrementAndGet();
                        if (isTerminateStatus(status)) {
                            synchronizedList.add(str);
                            this.metricsOverall.get(status).incrementAndGet();
                        } else if (minusSeconds.isAfter(((RpcState) entry.getValue()).getFinalTimeout())) {
                            log.error("Stale RPC operationId={} type={} status={}", new Object[]{entry.getKey(), ((RpcState) entry.getValue()).getReturnClass().getSimpleName(), ((RpcState) entry.getValue()).getStatus()});
                            synchronizedList.add(str);
                        }
                    });
                    this.metricsCurrent = concurrentHashMap;
                    Map<String, RpcState> map = this.currentRpc;
                    Objects.requireNonNull(map);
                    synchronizedList.forEach((v1) -> {
                        r1.remove(v1);
                    });
                    int size = this.currentRpc.keySet().size();
                    if (size > this.props.getLoggingThreshold()) {
                        log.warn("CurrentRpc {} is larger than STATS_CURRENT_RPC_THRESHOLD", Integer.valueOf(size));
                    }
                    this.finalizerRedisDelete.addAll(synchronizedList);
                    if (this.finalizerRedisDelete.size() > this.props.getProducer().getPurgeThreshold()) {
                        try {
                            try {
                                this.reactiveRedisTemplate.delete(Flux.fromIterable(synchronizedList)).subscribeOn(Schedulers.immediate()).publishOn(Schedulers.immediate()).block(this.deleteOperationsTimeout);
                                this.finalizerRedisDelete.clear();
                            } catch (Throwable th) {
                                this.finalizerRedisDelete.clear();
                                throw th;
                                break;
                            }
                        } catch (Exception e) {
                            log.error("Error delete redis keys {}", Integer.valueOf(synchronizedList.size()), e);
                            this.finalizerRedisDelete.clear();
                        }
                    }
                    if (!isRedisPollingThreadHealthy()) {
                        startRedisPollingThread();
                    }
                    Thread.sleep(cleanerFinalizerInterval);
                } catch (Exception e2) {
                    log.error("Error in finalizer RPC Redis Thread {}", this.namespace, e2);
                }
            }
        });
        thread.setName("RPC-Finalizer-" + this.namespace);
        thread.start();
    }

    private void startStatsThread() {
        long statsInterval = this.props.getProducer().getStatsInterval();
        Thread thread = new Thread(() -> {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            initMetricMap(concurrentHashMap);
            boolean isShowCurrentStats = this.props.getProducer().isShowCurrentStats();
            boolean isShowAllStats = this.props.getProducer().isShowAllStats();
            while (!Thread.currentThread().isInterrupted()) {
                StringBuilder sb = new StringBuilder();
                long j = this.metricsOverall.get(RpcStatus.FAILED_TIMEOUT).get();
                this.inFailedByTimeoutState = j - this.prevFailedStats > ((long) this.timeoutFailedThreshold);
                this.prevFailedStats = j;
                if (isShowCurrentStats) {
                    this.metricsCurrent.forEach((rpcStatus, atomicLong) -> {
                        sb.append(rpcStatus.name()).append(" = ").append(atomicLong.get()).append(", ");
                    });
                    if (this.metricsCurrent.size() > 0) {
                        log.info("Stats current {}", sb.toString());
                        this.metricsCurrent.clear();
                    }
                }
                StringBuilder sb2 = new StringBuilder();
                this.metricsOverall.forEach((rpcStatus2, atomicLong2) -> {
                    if (isTerminateStatus(rpcStatus2)) {
                        sb2.append(rpcStatus2.name()).append(" = ").append(atomicLong2.get()).append(", ");
                    }
                });
                if (this.metricsOverall.size() > 0) {
                    if (isShowAllStats) {
                        log.debug("Producer {}, Stats {} SUCCESS_RPC {}", new Object[]{this.namespace, sb2.toString(), Long.valueOf((this.metricsOverall.get(RpcStatus.DONE_OK).get() - concurrentHashMap.get(RpcStatus.DONE_OK).get()) / (statsInterval / 1000))});
                    }
                    this.metricsOverall.forEach((rpcStatus3, atomicLong3) -> {
                        AtomicLong atomicLong3 = (AtomicLong) concurrentHashMap.get(rpcStatus3);
                        if (atomicLong3 != null) {
                            this.meterRegistry.counter(mapRpcStatusToMetricName(rpcStatus3), new String[]{"type", "RPC", "RPC", "Producer", "namespace", this.namespace}).increment(atomicLong3.get() - atomicLong3.get());
                            atomicLong3.set(atomicLong3.get());
                        }
                    });
                }
                try {
                    Thread.sleep(statsInterval);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread.setName("RPC-Redis-Polling-Stats-" + this.namespace);
        thread.start();
    }

    private synchronized void startRedisPollingThread() {
        if (this.redisThread == null || !this.redisThread.isAlive()) {
            this.redisThread = new Thread(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        if (this.currentRpc.size() == 0) {
                            Thread.sleep(this.redisPollingIdleInterval.longValue());
                        } else {
                            ArrayList arrayList = new ArrayList();
                            for (Map.Entry<String, RpcState> entry : this.currentRpc.entrySet()) {
                                RpcStatus status = entry.getValue().getStatus();
                                if (!RpcStatus.PROCESSING.equals(status) && !RpcStatus.TO_PROCESS_QUEUE.equals(status) && !isTerminateStatus(status)) {
                                    arrayList.add(entry.getKey());
                                }
                            }
                            if (arrayList.size() == 0) {
                                Thread.sleep(this.redisPollingInterval.longValue());
                            } else {
                                List list = (List) this.reactiveRedisTemplate.opsForValue().multiGet(arrayList).subscribeOn(Schedulers.immediate()).publishOn(Schedulers.immediate()).block(this.pollingOperationsTimeout);
                                if (list != null) {
                                    for (int i = 0; i < list.size(); i++) {
                                        String str = (String) list.get(i);
                                        if (str != null) {
                                            String str2 = (String) arrayList.get(i);
                                            RpcState rpcState = this.currentRpc.get(str2);
                                            if (rpcState != null) {
                                                rpcState.setStatus(RpcStatus.TO_PROCESS_QUEUE);
                                                this.rpcResultExecutorPool.execute(() -> {
                                                    processResponse(str2, rpcState, str);
                                                });
                                            } else {
                                                log.warn("Lost RPC key={}. Is it cancelled by timeout??", str2);
                                            }
                                        }
                                    }
                                    Thread.sleep(this.redisPollingInterval.longValue());
                                }
                            }
                        }
                    } catch (Exception e) {
                        log.error("Error in polling loop", e);
                        return;
                    }
                }
            });
            this.redisThread.setName("RPC-Redis-Polling-" + this.namespace);
            this.redisThread.setUncaughtExceptionHandler((thread, th) -> {
                log.error("Failed in thread {}", "RPC-Redis-Polling-" + this.namespace, th);
            });
            this.redisThread.setDaemon(true);
            this.redisThread.start();
        }
    }

    public <T extends RpcResultType, N extends RpcRequest> Mono<RpcResult<T>> callWithoutContext(N n, String str, long j, Class<T> cls) {
        return call((RpcCtx) null, (RpcCtx) n, str, Long.valueOf(j), (Class) cls);
    }

    public <T extends RpcResultType, N extends RpcRequest> Mono<RpcResult<T>> callWithoutContext(N n, String str, Class<T> cls) {
        return call(null, n, str, cls);
    }

    public <T extends RpcResultType, N extends RpcRequest> Mono<RpcResult<T>> call(RpcCtx rpcCtx, N n, String str, Class<T> cls) {
        return Mono.subscriberContext().flatMap(context -> {
            String simpleName = n.getClass().getSimpleName();
            Optional orEmpty = context.getOrEmpty("TIMEOUT");
            if (!orEmpty.isPresent()) {
                log.trace("Default timeout operation is used {} {}", simpleName, Long.valueOf(this.rpcTimeout));
                return call(rpcCtx, (RpcCtx) n, str, Long.valueOf(this.rpcTimeout), cls);
            }
            OffsetDateTime now = OffsetDateTime.now();
            OffsetDateTime offsetDateTime = (OffsetDateTime) orEmpty.get();
            Duration between = Duration.between(now, offsetDateTime);
            return between.isNegative() ? Mono.error(new StmExecutionError(String.format("RPC call %s of operation %s will not be called because timeout expired %s in MS %s", simpleName, context.getOrDefault("OPERATION_ID", ""), offsetDateTime, this.namespace))) : call(rpcCtx, (RpcCtx) n, str, Long.valueOf(between.toMillis()), cls);
        });
    }

    public <T extends RpcResultType, N extends RpcRequest> Mono<RpcResult<T>> call(RpcCtx rpcCtx, N n, String str, Long l, Class<T> cls) {
        return Mono.subscriberContext().flatMap(context -> {
            String uuid = UUID.randomUUID().toString();
            String str2 = (String) context.getOrDefault("TRACE_ID", uuid);
            Duration of = Duration.of(l.longValue(), ChronoUnit.MILLIS);
            return Mono.create(monoSink -> {
                byte[] bArr;
                OffsetDateTime plus = OffsetDateTime.now().plus((TemporalAmount) of);
                final RpcState rpcState = new RpcState(monoSink, cls, RpcStatus.CREATED, plus);
                this.currentRpc.put(uuid, rpcState);
                HashMap hashMap = new HashMap();
                hashMap.put(InternalKafkaRpcConstants.KAFKA_GLOBAL_OPERATION_ID_SENT, uuid);
                GenericMessage genericMessage = new GenericMessage(n, hashMap);
                ProducerRecord fromMessage = this.kafkaTemplate.getMessageConverter().fromMessage(genericMessage, str);
                if (!fromMessage.headers().iterator().hasNext() && (bArr = (byte[]) genericMessage.getHeaders().get("kafka_correlationId", byte[].class)) != null) {
                    fromMessage.headers().add("kafka_correlationId", bArr);
                }
                fromMessage.headers().add(KafkaRpcInternalConstants.KAFKA_REDIS_RPC_TIMEOUT, plus.toString().getBytes());
                if (rpcCtx != null) {
                    try {
                        fromMessage.headers().add("KAFKA_USER_CONTEXT", this.objectMapper.writeValueAsBytes(rpcCtx));
                    } catch (JsonProcessingException e) {
                        log.error("Error write user context {}", rpcCtx);
                        monoSink.error(e);
                        return;
                    }
                }
                fromMessage.headers().add("TRACE_ID", str2.getBytes());
                if (log.isTraceEnabled()) {
                    log.trace("Send RPC request to namespace={} topic={} traceId={} request={} ctx={}", new Object[]{this.namespace, str, str2, n, rpcCtx});
                }
                this.kafkaTemplate.send(fromMessage).addCallback(new ListenableFutureCallback() { // from class: ru.stm.rpc.kafkaredis.producer.KafkaRedisRPCProducer.1
                    public void onFailure(Throwable th) {
                        monoSink.error(th);
                        rpcState.setStatus(RpcStatus.FAILED_TO_KAFKA);
                    }

                    public void onSuccess(Object obj) {
                        rpcState.setStatus(RpcStatus.SENT_TO_KAFKA);
                    }
                });
            }).timeout(of).onErrorMap(TimeoutException.class, timeoutException -> {
                String simpleName = n.getClass().getSimpleName();
                log.error("Operation id={} type={} ns={} failed by timeout with {} ms traceId={}", new Object[]{uuid, simpleName, str, l, str2});
                RpcState rpcState = this.currentRpc.get(uuid);
                if (rpcState != null) {
                    rpcState.setStatus(RpcStatus.FAILED_TIMEOUT);
                }
                return new RpcTimeoutException(uuid, str2, simpleName, timeoutException.getMessage());
            });
        });
    }

    public <T extends RpcResultType, N extends RpcRequest> Mono<RpcResult<T>> call(RpcCtx rpcCtx, N n, String str, String str2, Long l, Class<T> cls) {
        return call(rpcCtx, (RpcCtx) n, str, l, (Class) cls);
    }

    public <T extends RpcResultType, N extends RpcRequest> Mono<RpcResult<T>> call(RpcCtx rpcCtx, N n, String str, String str2, Class<T> cls) {
        return call(rpcCtx, n, str, cls);
    }

    public <T extends RpcResultType, N extends RpcRequest> Mono<RpcResult<T>> callWithoutContext(N n, String str, String str2, Class<T> cls) {
        return call(null, n, str, cls);
    }

    public <T extends RpcResultType, N extends RpcRequest> Mono<RpcResult<T>> callWithoutContext(N n, String str, String str2, long j, Class<T> cls) {
        return call(null, n, str, cls);
    }

    public String getName() {
        return "KAFKA_REDIS";
    }

    public String getNamespace() {
        return this.namespace;
    }

    private void processResponse(String str, RpcState rpcState, String str2) {
        rpcState.setStatus(RpcStatus.PROCESSING);
        Class returnClass = rpcState.getReturnClass();
        MonoSink handler = rpcState.getHandler();
        try {
            RpcResult rpcResult = (RpcResult) this.objectMapper.readValue(str2, this.objectMapper.getTypeFactory().constructParametricType(RpcResult.class, new Class[]{returnClass}));
            if (rpcResult.isOk()) {
                handler.success(rpcResult);
                rpcState.setStatus(RpcStatus.DONE_OK);
                log.trace("RPC propagate successful RPC result by key={} payload={}", str, rpcResult);
            } else if (rpcResult.getError() != null && RpcResult.InternalRpcResultErrorType.INTERNAL.equals(rpcResult.getError().getType())) {
                handler.error(new RpcForwardingException(rpcResult.getError().getMessage(), str));
                rpcState.setStatus(RpcStatus.DONE_FAILED);
                log.trace("RPC propagate INTERNAL ERROR RPC result by key={} payload={}", str, rpcResult);
            } else if (rpcResult.getError() != null && RpcResult.InternalRpcResultErrorType.BUSINESS.equals(rpcResult.getError().getType())) {
                handler.success(rpcResult);
                rpcState.setStatus(RpcStatus.DONE_OK);
                log.trace("RPC propagate business ERROR RPC result by key={} payload={}", str, rpcResult);
            }
        } catch (Exception e) {
            handler.error(e);
            log.error("Invalid response {} {}", new Object[]{str, str2, e});
            rpcState.setStatus(RpcStatus.DONE_FAILED);
        }
    }

    protected void doHealthCheck(Health.Builder builder) {
        builder.up();
        if (!isRedisPollingThreadHealthy()) {
            builder.outOfService();
        }
        if (this.inFailedByTimeoutState) {
            builder.outOfService();
        }
    }

    private boolean isRedisPollingThreadHealthy() {
        if (this.redisThread != null && this.redisThread.isAlive()) {
            return true;
        }
        log.error("RPC-Redis-Polling- thread is not found!");
        return false;
    }
}
