package ru.stm.rpc.kafkaredis.consumer;

import io.micrometer.core.instrument.MeterRegistry;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.kafka.support.Acknowledgment;
import reactor.core.publisher.Mono;
import ru.stm.platform.BusinessError;
import ru.stm.rpc.core.RpcExpiredException;
import ru.stm.rpc.kafkaredis.config.KafkaRpcConnection;
import ru.stm.rpc.types.RpcRequest;
import ru.stm.rpc.types.RpcResultType;

/* loaded from: input_file:ru/stm/rpc/kafkaredis/consumer/AbstractRpcListener.class */
abstract class AbstractRpcListener implements ApplicationListener {
    private final RpcResponseService rpcResponseService;
    private final int defaultRpcTimeout;
    private final MeterRegistry meterRegistry;
    private final String namespace;
    private final KafkaRpcConnection connection;
    private static Thread statisticsThread;
    private static final Logger log = LoggerFactory.getLogger(AbstractRpcListener.class);
    private static final AtomicLong polled = new AtomicLong(0);
    private static final Object lock = new Object();
    private static volatile boolean initializedThread = false;
    private static volatile boolean initializedMetrics = false;
    private static final Map<RpcConsumerStatus, AtomicLong> metricsOverall = new ConcurrentHashMap();
    private static final Map<RpcConsumerStatus, AtomicLong> prevStatsAll = new ConcurrentHashMap();

    @FunctionalInterface
    /* loaded from: input_file:ru/stm/rpc/kafkaredis/consumer/AbstractRpcListener$RpcExecution.class */
    public interface RpcExecution<V> {
        V call();
    }

    public AbstractRpcListener(RpcResponseService rpcResponseService, KafkaRpcConnection kafkaRpcConnection, MeterRegistry meterRegistry) {
        this.namespace = kafkaRpcConnection.getNamespace();
        this.rpcResponseService = rpcResponseService;
        this.defaultRpcTimeout = kafkaRpcConnection.getProps().getConsumer().getExecutionTimeout();
        this.meterRegistry = meterRegistry;
        this.connection = kafkaRpcConnection;
        synchronized (lock) {
            if (!initializedMetrics) {
                initMetricMap(metricsOverall);
                initMetricMap(prevStatsAll);
                initializedMetrics = true;
            }
        }
    }

    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        if (applicationEvent instanceof ContextRefreshedEvent) {
            synchronized (lock) {
                if (!initializedThread) {
                    startMetricsThread(this.connection.getProps().getConsumer().isShowCurrentStats(), this.connection.getProps().getConsumer().getStatsInterval());
                    initializedThread = true;
                }
            }
        }
        if (applicationEvent instanceof ContextClosedEvent) {
            synchronized (lock) {
                if (initializedThread && statisticsThread.isAlive()) {
                    statisticsThread.interrupt();
                }
                initializedThread = false;
            }
        }
    }

    private void startMetricsThread(boolean z, long j) {
        statisticsThread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                metricsOverall.get(RpcConsumerStatus.RPC_SENT_OK).set(this.rpcResponseService.getSentResponsesOk().get());
                metricsOverall.get(RpcConsumerStatus.RPC_POLLED).set(polled.get());
                metricsOverall.get(RpcConsumerStatus.RPC_SENT_EXCEPTION).set(this.rpcResponseService.getSentException().get());
                metricsOverall.get(RpcConsumerStatus.RPC_SENT_FAILED).set(this.rpcResponseService.getSentResponsesFailed().get());
                metricsOverall.forEach((rpcConsumerStatus, atomicLong) -> {
                    AtomicLong atomicLong = prevStatsAll.get(rpcConsumerStatus);
                    if (atomicLong != null) {
                        this.meterRegistry.counter(mapRpcStatusToMetricName(rpcConsumerStatus), new String[]{"type", "RPC", "RPC", "Consumer", "namespace", this.namespace}).increment(atomicLong.get() - atomicLong.get());
                        atomicLong.set(atomicLong.get());
                    }
                });
                if (z) {
                    StringBuilder sb = new StringBuilder();
                    metricsOverall.forEach((rpcConsumerStatus2, atomicLong2) -> {
                        sb.append(rpcConsumerStatus2.name()).append(" = ").append(atomicLong2.get()).append(", ");
                    });
                    if (metricsOverall.size() > 0) {
                        log.info("Consumer {} stat {}", this.namespace, sb.toString());
                    }
                }
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.warn("Metric thread is interrupted", e);
                }
            }
        });
        statisticsThread.start();
        statisticsThread.setName("Rpc-Consumer-Stats");
    }

    private String mapRpcStatusToMetricName(RpcConsumerStatus rpcConsumerStatus) {
        return "rpc." + rpcConsumerStatus.name().toLowerCase().replace("_", ".");
    }

    private void initMetricMap(Map<RpcConsumerStatus, AtomicLong> map) {
        map.put(RpcConsumerStatus.RPC_POLLED, new AtomicLong(0L));
        map.put(RpcConsumerStatus.RPC_SENT_EXCEPTION, new AtomicLong(0L));
        map.put(RpcConsumerStatus.RPC_SENT_FAILED, new AtomicLong(0L));
        map.put(RpcConsumerStatus.RPC_SENT_OK, new AtomicLong(0L));
    }

    protected <T> Mono<T> businessError(String str, String str2) {
        return BusinessError.businessError(str, str2);
    }

    public <N extends RpcResultType, T extends RpcRequest> void executeAndSend(String str, String str2, T t, Acknowledgment acknowledgment, RpcExecution<? extends Mono<? extends N>> rpcExecution, OffsetDateTime offsetDateTime) {
        polled.incrementAndGet();
        String simpleName = t.getClass().getSimpleName();
        this.meterRegistry.counter("rpc_operation_consumer", new String[]{"type", RpcConsumerStatus.RPC_POLLED.name(), "operation_name", simpleName}).increment();
        log.trace("Execute type {} id {}", simpleName, str);
        OffsetDateTime now = OffsetDateTime.now();
        Duration of = Duration.of(this.defaultRpcTimeout, ChronoUnit.MILLIS);
        if (offsetDateTime != null) {
            of = Duration.between(now, offsetDateTime);
            if (of.isNegative()) {
                this.rpcResponseService.sendExceptionally(t, str, str2, simpleName, RpcExpiredException.create(simpleName, str, str2, offsetDateTime), this.namespace, of);
                acknowledgment.acknowledge();
                return;
            }
        }
        Duration duration = of;
        Duration duration2 = of;
        rpcExecution.call().timeout(of).doOnError(TimeoutException.class, timeoutException -> {
            log.error("Timeout during RPC request {}, req={}, key={}", new Object[]{simpleName, t, str});
        }).switchIfEmpty(businessError("entity.not.found", "Not found")).subscribe(rpcResultType -> {
            this.rpcResponseService.sendSuccess(t, str, str2, simpleName, rpcResultType, this.namespace, duration);
            acknowledgment.acknowledge();
        }, th -> {
            if (th instanceof BusinessError) {
                this.rpcResponseService.sendErrorBusiness(t, str, str2, simpleName, (BusinessError) th, this.namespace, duration2);
            } else {
                this.rpcResponseService.sendExceptionally(t, str, str2, simpleName, th, this.namespace, duration2);
            }
            acknowledgment.acknowledge();
        }, () -> {
        });
    }
}
