package io.quarkiverse.reactive.messaging.nats.jetstream.client.vertx;

import io.nats.client.ConsumerContext;
import io.nats.client.FetchConsumeOptions;
import io.nats.client.FetchConsumer;
import io.nats.client.JetStreamApiException;
import io.nats.client.PublishOptions;
import io.nats.client.api.MessageInfo;
import io.nats.client.api.PublishAck;
import io.nats.client.impl.Headers;
import io.quarkiverse.reactive.messaging.nats.jetstream.ExponentialBackoff;
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.AbstractConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConsumerNotFoundException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.FetchException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.KeyValueException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageNotFoundException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.PublishException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.JetStreamSetupException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConsumerConfigurtationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.FetchConsumerConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.PublishConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.message.MessageFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrumenter;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamTrace;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import io.vertx.mutiny.core.Context;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/vertx/MessageConnection.class */
public class MessageConnection extends AbstractConnection implements io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection {
    private static final Logger logger = Logger.getLogger(MessageConnection.class);
    protected final MessageFactory messageFactory;
    protected final Context context;
    protected final JetStreamInstrumenter instrumenter;

    public MessageConnection(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener, MessageFactory messageFactory, Context context, JetStreamInstrumenter jetStreamInstrumenter) {
        super(connectionConfiguration, connectionListener);
        this.messageFactory = messageFactory;
        this.context = context;
        this.instrumenter = jetStreamInstrumenter;
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection
    public <T> Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishConfiguration) {
        Uni item = Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                Optional metadata = message.getMetadata(JetStreamOutgoingMessageMetadata.class);
                String str = (String) metadata.map((v0) -> {
                    return v0.messageId();
                }).orElseGet(() -> {
                    return UUID.randomUUID().toString();
                });
                byte[] byteArray = this.messageFactory.toByteArray(message.getPayload());
                Optional map = metadata.flatMap((v0) -> {
                    return v0.subtopic();
                }).map(str2 -> {
                    return publishConfiguration.subject() + "." + str2;
                });
                Objects.requireNonNull(publishConfiguration);
                String str3 = (String) map.orElseGet(publishConfiguration::subject);
                HashMap hashMap = new HashMap();
                metadata.ifPresent(jetStreamOutgoingMessageMetadata -> {
                    hashMap.putAll(jetStreamOutgoingMessageMetadata.headers());
                });
                if (message.getPayload() != null) {
                    hashMap.putIfAbsent(MessageFactory.MESSAGE_TYPE_HEADER, List.of(message.getPayload().getClass().getTypeName()));
                }
                if (publishConfiguration.traceEnabled()) {
                    TracingUtils.traceOutgoing(this.instrumenter.publisher(), message, new JetStreamTrace(publishConfiguration.stream(), str3, str, hashMap, new String(byteArray)));
                }
                PublishAck publish = this.connection.jetStream().publish(str3, toJetStreamHeaders(hashMap), byteArray, createPublishOptions(str, publishConfiguration.stream()));
                if (logger.isDebugEnabled()) {
                    logger.debugf("Published message: %s", publish);
                }
                this.connection.flush(Duration.ZERO);
                return message;
            } catch (IOException | JetStreamApiException | JetStreamSetupException e) {
                throw new PublishException(String.format("Failed to publish message: %s", e.getMessage()), e);
            }
        }));
        Context context = this.context;
        Objects.requireNonNull(context);
        return item.emitOn(context::runOnContext).onItem().transformToUni(this::acknowledge).onFailure().recoverWithUni(th -> {
            return notAcknowledge(message, th);
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection
    public <T> Uni<Message<T>> publish(Message<T> message, PublishConfiguration publishConfiguration, FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        return addOrUpdateConsumer(fetchConsumerConfiguration).onItem().transformToUni(consumerContext -> {
            return publish(message, publishConfiguration);
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection
    public <T> Uni<Message<T>> nextMessage(FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        return getConsumerContext(fetchConsumerConfiguration).onItem().transformToUni(consumerContext -> {
            return nextMessage(consumerContext, fetchConsumerConfiguration);
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection
    public <T> Multi<Message<T>> nextMessages(FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        Multi transformToMulti = getConsumerContext(fetchConsumerConfiguration).onItem().transformToMulti(consumerContext -> {
            return nextMessages(consumerContext, fetchConsumerConfiguration);
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return transformToMulti.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection
    public <T> Uni<T> getKeyValue(String str, String str2, Class<T> cls) {
        Uni item = Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return Optional.ofNullable(this.connection.keyValue(str).get(str2)).map(keyValueEntry -> {
                    return this.messageFactory.decode(keyValueEntry.getValue(), cls);
                }).orElse(null);
            } catch (IOException | JetStreamApiException e) {
                throw new KeyValueException(e);
            }
        }));
        Context context = this.context;
        Objects.requireNonNull(context);
        return item.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection
    public <T> Uni<Void> putKeyValue(String str, String str2, T t) {
        Uni item = Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                this.connection.keyValue(str).put(str2, this.messageFactory.toByteArray(t));
                return null;
            } catch (IOException | JetStreamApiException e) {
                throw new KeyValueException(e);
            }
        }));
        Context context = this.context;
        Objects.requireNonNull(context);
        return item.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection
    public Uni<Void> deleteKeyValue(String str, String str2) {
        Uni item = Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                this.connection.keyValue(str).delete(str2);
                return null;
            } catch (IOException | JetStreamApiException e) {
                throw new KeyValueException(e);
            }
        }));
        Context context = this.context;
        Objects.requireNonNull(context);
        return item.emitOn(context::runOnContext);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.MessageConnection
    public <T> Uni<Message<T>> resolve(String str, long j) {
        Uni emitter = Uni.createFrom().emitter(uniEmitter -> {
            try {
                MessageInfo message = this.connection.jetStream().getStreamContext(str).getMessage(j);
                uniEmitter.complete(new JetStreamMessage(message, this.messageFactory.toPayload(message).orElse(null)));
            } catch (IOException | JetStreamApiException e) {
                uniEmitter.fail(e);
            }
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return emitter.emitOn(context::runOnContext);
    }

    private PublishOptions createPublishOptions(String str, String str2) {
        return PublishOptions.builder().messageId(str).stream(str2).build();
    }

    private FetchConsumer fetchConsumer(ConsumerContext consumerContext, Duration duration) throws IOException, JetStreamApiException {
        return duration == null ? consumerContext.fetch(FetchConsumeOptions.builder().maxMessages(1).noWait().build()) : consumerContext.fetch(((FetchConsumeOptions.Builder) FetchConsumeOptions.builder().maxMessages(1).expiresIn(duration.toMillis())).build());
    }

    private <T> Uni<Message<T>> acknowledge(Message<T> message) {
        return Uni.createFrom().completionStage(message.ack()).onItem().transform(r3 -> {
            return message;
        });
    }

    private <T> Uni<Message<T>> notAcknowledge(Message<T> message, Throwable th) {
        return Uni.createFrom().completionStage(message.nack(th)).onItem().invoke(() -> {
            logger.warnf(th, "Message not published: %s", th.getMessage());
        }).onItem().transformToUni(r4 -> {
            return Uni.createFrom().item(message);
        });
    }

    private <T> Uni<ConsumerContext> getConsumerContext(FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        Uni recoverWithUni = Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return this.connection.getStreamContext(fetchConsumerConfiguration.stream()).getConsumerContext(fetchConsumerConfiguration.name().orElseThrow(() -> {
                    return new IllegalArgumentException("Consumer name is not configured");
                }));
            } catch (IOException e) {
                throw new FetchException(e);
            } catch (JetStreamApiException e2) {
                if (e2.getApiErrorCode() == 10014) {
                    throw new ConsumerNotFoundException(fetchConsumerConfiguration.stream(), fetchConsumerConfiguration.name().orElse(null));
                }
                throw new FetchException(e2);
            }
        })).onFailure().recoverWithUni(th -> {
            return handleConsumerContextFailure(fetchConsumerConfiguration, th);
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return recoverWithUni.emitOn(context::runOnContext);
    }

    private <T> Uni<ConsumerContext> handleConsumerContextFailure(FetchConsumerConfiguration<T> fetchConsumerConfiguration, Throwable th) {
        return th instanceof ConsumerNotFoundException ? addOrUpdateConsumer(fetchConsumerConfiguration) : Uni.createFrom().failure(th);
    }

    private Uni<io.nats.client.Message> nextMessage(ConsumerContext consumerContext, Duration duration) {
        Uni item = Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                FetchConsumer fetchConsumer = fetchConsumer(consumerContext, duration);
                try {
                    io.nats.client.Message nextMessage = fetchConsumer.nextMessage();
                    if (nextMessage == null) {
                        throw new MessageNotFoundException();
                    }
                    if (fetchConsumer != null) {
                        fetchConsumer.close();
                    }
                    return nextMessage;
                } finally {
                }
            } catch (Throwable th) {
                logger.errorf(th, "Failed to fetch message: %s", th.getMessage());
                throw new FetchException(th);
            }
        }));
        Context context = this.context;
        Objects.requireNonNull(context);
        return item.emitOn(context::runOnContext);
    }

    private <T> Uni<Message<T>> nextMessage(ConsumerContext consumerContext, FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        return nextMessage(consumerContext, fetchConsumerConfiguration.fetchTimeout().orElse(null)).map(message -> {
            return this.messageFactory.create(message, fetchConsumerConfiguration.traceEnabled(), (Class) fetchConsumerConfiguration.payloadType().orElse(null), this.context, new ExponentialBackoff(false, Duration.ZERO), fetchConsumerConfiguration.ackTimeout());
        });
    }

    private Headers toJetStreamHeaders(Map<String, List<String>> map) {
        Headers headers = new Headers();
        Objects.requireNonNull(headers);
        map.forEach((v1, v2) -> {
            r1.add(v1, v2);
        });
        return headers;
    }

    private <T> Uni<ConsumerContext> addOrUpdateConsumer(FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        Uni item = Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                ConsumerContext createOrUpdateConsumer = this.connection.getStreamContext(fetchConsumerConfiguration.stream()).createOrUpdateConsumer(new ConsumerConfigurtationFactory().create(fetchConsumerConfiguration));
                this.connection.flush(Duration.ZERO);
                return createOrUpdateConsumer;
            } catch (IOException | JetStreamApiException e) {
                throw new FetchException(e);
            }
        }));
        Context context = this.context;
        Objects.requireNonNull(context);
        return item.emitOn(context::runOnContext);
    }

    private <T> Multi<Message<T>> nextMessages(ConsumerContext consumerContext, FetchConsumerConfiguration<T> fetchConsumerConfiguration) {
        Multi emitter = Multi.createFrom().emitter(multiEmitter -> {
            try {
                FetchConsumer fetchConsumer = fetchConsumer(consumerContext, fetchConsumerConfiguration.fetchTimeout().orElse(null));
                try {
                    for (io.nats.client.Message nextMessage = fetchConsumer.nextMessage(); nextMessage != null; nextMessage = fetchConsumer.nextMessage()) {
                        multiEmitter.emit(this.messageFactory.create(nextMessage, fetchConsumerConfiguration.traceEnabled(), (Class) fetchConsumerConfiguration.payloadType().orElse(null), this.context, new ExponentialBackoff(false, Duration.ZERO), fetchConsumerConfiguration.ackTimeout()));
                    }
                    multiEmitter.complete();
                    if (fetchConsumer != null) {
                        fetchConsumer.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                multiEmitter.fail(new FetchException(th));
            }
        });
        Context context = this.context;
        Objects.requireNonNull(context);
        return emitter.emitOn(context::runOnContext);
    }
}
