package ru.stm.rpc.kafkaredis.service;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;
import reactor.core.publisher.Mono;
import ru.stm.rpc.core.RpcCtx;
import ru.stm.rpc.services.RpcService;
import ru.stm.rpc.types.RpcRequest;
import ru.stm.rpc.types.RpcResultType;

/* loaded from: input_file:ru/stm/rpc/kafkaredis/service/RpcProvider.class */
public class RpcProvider {
    private static final Logger log = LoggerFactory.getLogger(RpcProvider.class);
    private final ApplicationContext ctx;
    private final String topic;
    private final String namespace;
    private final KafkaTemplate kafka;
    private RpcService rpc;

    public RpcProvider(ApplicationContext applicationContext, String str, String str2, KafkaTemplate kafkaTemplate) {
        this.ctx = applicationContext;
        this.topic = str;
        this.namespace = str2;
        this.kafka = kafkaTemplate;
    }

    public <T extends RpcRequest, R extends RpcResultType> Mono<R> call(RpcCtx rpcCtx, T t, Class<R> cls) {
        return rpc().call(rpcCtx, t, this.topic, this.namespace, cls).flatMap(rpcResult -> {
            return rpcResult.isOk() ? Mono.just(rpcResult.getData()) : Mono.error(rpcResult.getError().toThrowable());
        });
    }

    public <T extends RpcRequest, R extends RpcResultType> Mono<R> call(T t, Class<R> cls) {
        return rpc().callWithoutContext(t, this.topic, this.namespace, cls).flatMap(rpcResult -> {
            return rpcResult.isOk() ? Mono.just(rpcResult.getData()) : Mono.error(rpcResult.getError().toThrowable());
        });
    }

    public <T extends RpcRequest, R extends RpcResultType> Mono<R> call(T t, String str, Class<R> cls) {
        return rpc().callWithoutContext(t, str, this.namespace, cls).flatMap(rpcResult -> {
            return rpcResult.isOk() ? Mono.just(rpcResult.getData()) : Mono.error(rpcResult.getError().toThrowable());
        });
    }

    public <T, R> Mono<R> send(T t, R r) {
        return Mono.create(monoSink -> {
            log.trace("KAFKA SEND: topic={}, req={}", this.topic, t);
            this.kafka.send(new ProducerRecord(this.topic, t)).addCallback(obj -> {
                monoSink.success(r);
            }, th -> {
                monoSink.error(th);
            });
        });
    }

    public <T> Mono<?> send(T t) {
        return send(t, true);
    }

    private RpcService<RpcCtx> rpc() {
        if (this.rpc == null) {
            this.rpc = (RpcService) this.ctx.getBean(RpcService.class);
        }
        return this.rpc;
    }
}
