package ru.stm.rpc.kafkaredis.service;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.util.StringUtils;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import ru.stm.rpc.kafkaredis.config.KafkaRedisRpcProperties;
import ru.stm.rpc.kafkaredis.config.KafkaRpcConnection;
import ru.stm.rpc.kafkaredis.config.KafkaRpcConnectionFactory;
import ru.stm.rpc.kafkaredis.consumer.RpcAbstractRpcListener;
import ru.stm.rpc.kafkaredis.consumer.RpcResponseService;
import ru.stm.rpc.kafkaredis.producer.KafkaRedisRPCProducer;
import ru.stm.rpc.kafkaredis.util.RpcDirection;

/* loaded from: input_file:ru/stm/rpc/kafkaredis/service/RpcNamespace.class */
public class RpcNamespace {
    private static final Logger log = LoggerFactory.getLogger(RpcNamespace.class);
    private static final int REDIS_DEFAULT_PORT = 6379;
    private final String name;
    private final KafkaRedisRpcProperties.KafkaRedisRpcItem props;
    private final ApplicationContext ctx;
    private final MeterRegistry meterRegistry;
    private final Map<String, RpcTopic> topics = new HashMap();
    private final Set<String> consumerTopics = new HashSet();
    private ConcurrentKafkaListenerContainerFactory kafkaConsumer;
    private KafkaTemplate kafkaProducer;
    private ReactiveRedisTemplate<String, Object> redis;
    private KafkaRpcConnection rpcConn;
    private KafkaRedisRPCProducer rpcProducer;
    private RpcAbstractRpcListener rpcListener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ru/stm/rpc/kafkaredis/service/RpcNamespace$Listener.class */
    public class Listener implements AcknowledgingConsumerAwareMessageListener<String, String> {
        private Listener() {
        }

        public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            try {
                RpcTopic rpcTopic = RpcNamespace.this.topics.get(consumerRecord.topic());
                if (rpcTopic == null) {
                    throw new UnsupportedOperationException("Unsupported topic " + consumerRecord.topic() + ", req=" + ((String) consumerRecord.value()));
                }
                rpcTopic.handle(consumerRecord, RpcNamespace.this.rpcListener, acknowledgment);
            } catch (Exception e) {
                RpcNamespace.log.error("Error processing RPC", e);
                acknowledgment.acknowledge();
            }
        }

        public /* bridge */ /* synthetic */ void onMessage(Object obj, Acknowledgment acknowledgment, Consumer consumer) {
            onMessage((ConsumerRecord<String, String>) obj, acknowledgment, (Consumer<?, ?>) consumer);
        }
    }

    public RpcNamespace(String str, KafkaRedisRpcProperties.KafkaRedisRpcItem kafkaRedisRpcItem, ApplicationContext applicationContext, MeterRegistry meterRegistry) {
        this.name = str;
        this.props = kafkaRedisRpcItem;
        this.ctx = applicationContext;
        this.meterRegistry = meterRegistry;
        log.debug("Found RPC Namespace '{}', hasConsumer={}, hasProducer={}, hasRedis={}", new Object[]{str, Boolean.valueOf(hasConsumer()), Boolean.valueOf(hasProducer()), Boolean.valueOf(hasRedis())});
        verify();
        configure();
    }

    private void verify() {
        if (hasConsumer()) {
            KafkaRedisRpcProperties.KafkaConsumerConfiguration kafka = this.props.getConsumer().getKafka();
            if (kafka == null || StringUtils.isBlank(kafka.getBootstrapServers())) {
                throw new IllegalArgumentException(String.format("You set consumer for namespace %s but you have not setup Kafka Bootstrap Servers", this.name));
            }
            log.info("RPC Namespace '{}': consumer servers: {}", this.name, kafka.getBootstrapServers());
        }
        if (hasProducer()) {
            KafkaRedisRpcProperties.KafkaProducerConfiguration kafka2 = this.props.getProducer().getKafka();
            if (kafka2 == null || StringUtils.isBlank(kafka2.getBootstrapServers())) {
                throw new IllegalArgumentException(String.format("You set producer for namespace %s but you have not setup Kafka Bootstrap Servers", this.name));
            }
            log.info("RPC Namespace '{}': producer servers: {}", this.name, kafka2.getBootstrapServers());
        }
    }

    private void configure() {
        if (hasConsumer()) {
            this.kafkaConsumer = KafkaRpcConnectionFactory.createRpcListener(this.props, this.ctx);
        }
        if (hasProducer()) {
            this.kafkaProducer = KafkaRpcConnectionFactory.createKafkaTemplate(this.props.getProducer().getKafka().getBootstrapServers(), this.props.getProducer().getMaxRequestSize());
        }
        if (hasRedis()) {
            if (this.props.getRedis().getPort() == 0) {
                this.props.getRedis().setPort(REDIS_DEFAULT_PORT);
                log.info("RPC Namespace '{}': switching Redis port to default: {}", this.name, Integer.valueOf(REDIS_DEFAULT_PORT));
            }
            this.redis = KafkaRpcConnectionFactory.rpcRedis(this.props);
            this.rpcConn = createRpcConn();
        }
        if (hasRpcProducer()) {
            this.rpcProducer = new KafkaRedisRPCProducer(this.rpcConn, this.meterRegistry);
        }
    }

    private KafkaRpcConnection createRpcConn() {
        KafkaRpcConnection kafkaRpcConnection = new KafkaRpcConnection();
        kafkaRpcConnection.setNamespace(this.name);
        kafkaRpcConnection.setRedisTemplate(this.redis);
        kafkaRpcConnection.setProps(this.props);
        if (hasConsumer()) {
            kafkaRpcConnection.setRedisttl(Duration.of(this.props.getConsumer().getRedisTtl(), ChronoUnit.MILLIS));
            kafkaRpcConnection.setConsumerBackoff(Duration.of(this.props.getConsumer().getRetryBackoff(), ChronoUnit.MILLIS));
        }
        kafkaRpcConnection.setKafkaTemplate(this.kafkaProducer);
        return kafkaRpcConnection;
    }

    public Collection<RpcTopic> topics() {
        return this.topics.values();
    }

    public boolean hasConsumer() {
        return this.props.getConsumer() != null;
    }

    public boolean hasProducer() {
        return this.props.getProducer() != null;
    }

    public boolean hasRedis() {
        return this.props.getRedis() != null;
    }

    public boolean hasRpcProducer() {
        return hasRedis() && hasProducer();
    }

    public String getProducerServers() {
        if (hasProducer()) {
            return this.props.getProducer().getKafka().getBootstrapServers();
        }
        throw new IllegalArgumentException("No producer configuration for " + this.name);
    }

    public String getConsumerServers() {
        if (hasConsumer()) {
            return this.props.getConsumer().getKafka().getBootstrapServers();
        }
        throw new IllegalArgumentException("No consumer configuration for " + this.name);
    }

    public void createTopic(RpcDirection rpcDirection, String str, boolean z) {
        this.topics.computeIfAbsent(str, str2 -> {
            return new RpcTopic(this.name, str, z);
        });
        if (rpcDirection == RpcDirection.CONSUMER) {
            this.consumerTopics.add(str);
        }
    }

    public void addListenerBean(String str, Object obj) {
        ensureTopic(str).addListenerBean(obj);
    }

    public void createListener(RpcResponseService rpcResponseService, ApplicationEvent applicationEvent) {
        if (hasConsumer() && !this.consumerTopics.isEmpty() && this.rpcListener == null) {
            this.rpcListener = new RpcAbstractRpcListener(rpcResponseService, this.rpcConn, this.meterRegistry);
            log.info("Creating Kafka consumer container, namespace={}, topics={}", this.name, this.consumerTopics);
            if (this.props.getConsumer().isDisable()) {
                log.warn("Consumer for namespace={} and topics={} is disabled!", this.name, this.consumerTopics);
            } else {
                AbstractMessageListenerContainer createContainer = this.kafkaConsumer.createContainer((String[]) this.consumerTopics.toArray(new String[0]));
                createContainer.setupMessageListener(new Listener());
                createContainer.start();
            }
        }
        if (this.rpcListener != null) {
            this.rpcListener.onApplicationEvent(applicationEvent);
        }
    }

    private RpcTopic ensureTopic(String str) {
        RpcTopic rpcTopic = this.topics.get(str);
        if (rpcTopic == null) {
            throw new IllegalArgumentException("Incorrect topic name: " + str);
        }
        return rpcTopic;
    }

    public String getName() {
        return this.name;
    }

    public KafkaRedisRpcProperties.KafkaRedisRpcItem getProps() {
        return this.props;
    }

    public ApplicationContext getCtx() {
        return this.ctx;
    }

    public MeterRegistry getMeterRegistry() {
        return this.meterRegistry;
    }

    public Map<String, RpcTopic> getTopics() {
        return this.topics;
    }

    public Set<String> getConsumerTopics() {
        return this.consumerTopics;
    }

    public ConcurrentKafkaListenerContainerFactory getKafkaConsumer() {
        return this.kafkaConsumer;
    }

    public KafkaTemplate getKafkaProducer() {
        return this.kafkaProducer;
    }

    public ReactiveRedisTemplate<String, Object> getRedis() {
        return this.redis;
    }

    public KafkaRpcConnection getRpcConn() {
        return this.rpcConn;
    }

    public KafkaRedisRPCProducer getRpcProducer() {
        return this.rpcProducer;
    }

    public RpcAbstractRpcListener getRpcListener() {
        return this.rpcListener;
    }
}
