package io.memoria.jutils.messaging.adapter.nats;

import io.memoria.jutils.core.utils.functional.ReactorVavrUtils;
import io.memoria.jutils.core.utils.yaml.YamlConfigMap;
import io.memoria.jutils.messaging.domain.entity.Msg;
import io.memoria.jutils.messaging.domain.port.MsgProducer;
import io.nats.client.Connection;
import io.vavr.control.Try;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/memoria/jutils/messaging/adapter/nats/NatsMsgProducer.class */
public class NatsMsgProducer implements MsgProducer {
    private final Connection nc;
    private final Scheduler scheduler;
    private final Duration timeout;

    public NatsMsgProducer(YamlConfigMap yamlConfigMap, Scheduler scheduler) throws IOException, InterruptedException {
        this.scheduler = scheduler;
        this.timeout = Duration.ofMillis(yamlConfigMap.asYamlConfigMap("reactorNats").asLong("producer.request.timeout").longValue());
        this.nc = NatsConnection.create(yamlConfigMap);
    }

    public Flux<Try<Void>> produce(String str, String str2, Flux<Msg> flux) {
        return flux.publishOn(this.scheduler).timeout(this.timeout).map(msg -> {
            return Try.run(() -> {
                this.nc.publish(str + "." + str2, ((String) msg.value).getBytes(StandardCharsets.UTF_8));
            });
        });
    }

    public Mono<Try<Void>> close() {
        return ReactorVavrUtils.blockingToMono(() -> {
            Connection connection = this.nc;
            Objects.requireNonNull(connection);
            return Try.run(connection::close);
        }, this.scheduler);
    }
}
