package io.quarkiverse.reactive.messaging.nats.jetstream.administration;

import io.nats.client.JetStreamApiException;
import io.nats.client.api.MessageInfo;
import io.quarkiverse.reactive.messaging.nats.NatsConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.JetStreamClient;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.PayloadMapper;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.io.IOException;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

@ApplicationScoped
/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/administration/MessageResolver.class */
public class MessageResolver {
    private static final Logger logger = Logger.getLogger(MessageResolver.class);
    private final NatsConfiguration configuration;
    private final PayloadMapper payloadMapper;
    private final ExecutionHolder executionHolder;

    @Inject
    public MessageResolver(NatsConfiguration natsConfiguration, PayloadMapper payloadMapper, ExecutionHolder executionHolder) {
        this.configuration = natsConfiguration;
        this.payloadMapper = payloadMapper;
        this.executionHolder = executionHolder;
    }

    public <T> Uni<Message<T>> resolve(String str, long j) {
        JetStreamClient jetStreamClient = new JetStreamClient(ConnectionConfiguration.of(this.configuration), this.executionHolder.vertx());
        return jetStreamClient.getOrEstablishConnection().onItem().transformToUni(connection -> {
            return resolve(connection, str, j);
        }).onItem().invoke(message -> {
            jetStreamClient.close();
        }).onFailure().invoke(th -> {
            logger.errorf(th, "Failed to resolve message: %s", th.getMessage());
            jetStreamClient.close();
        });
    }

    public <T> Uni<Message<T>> resolve(Connection connection, String str, long j) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                MessageInfo message = connection.jetStream().getStreamContext(str).getMessage(j);
                uniEmitter.complete(new JetStreamMessage(message, this.payloadMapper.toPayload(message).orElse(null)));
            } catch (IOException | JetStreamApiException e) {
                uniEmitter.fail(e);
            }
        });
    }
}
