package io.es4j.infrastructure.pgbroker.vertx;

import io.es4j.infrastructure.pgbroker.core.PartitionHashRing;
import io.es4j.infrastructure.pgbroker.exceptions.ProducerExeception;
import io.es4j.infrastructure.pgbroker.mappers.MessageMapper;
import io.es4j.infrastructure.pgbroker.models.ConsumerTransaction;
import io.es4j.infrastructure.pgbroker.models.MessageID;
import io.es4j.infrastructure.pgbroker.models.MessageRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecordKey;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQuery;
import io.es4j.infrastructure.pgbroker.models.MessageState;
import io.es4j.infrastructure.pgbroker.models.TopicMessage;
import io.es4j.sql.Repository;
import io.es4j.sql.RepositoryHandler;
import io.es4j.sql.models.BaseRecord;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.sqlclient.SqlConnection;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/vertx/PgClientTopicProducer.class */
public class PgClientTopicProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(PgClientTopicProducer.class);
    private final Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> queue;

    public PgClientTopicProducer(RepositoryHandler repositoryHandler) {
        this.queue = new Repository<>(MessageMapper.INSTANCE, repositoryHandler);
    }

    public <T> Uni<Void> publish(TopicMessage<T> topicMessage, ConsumerTransaction consumerTransaction) {
        log(topicMessage);
        return this.queue.insert(parse(topicMessage), (SqlConnection) consumerTransaction.connection()).replaceWithVoid().onFailure().transform(ProducerExeception::new);
    }

    public <T> Uni<Void> publish(TopicMessage<T> topicMessage) {
        MessageRecord parse = parse(topicMessage);
        log(topicMessage);
        return this.queue.insert(parse).replaceWithVoid().onFailure().transform(ProducerExeception::new);
    }

    public <T> Uni<Void> publish(List<TopicMessage<T>> list, ConsumerTransaction consumerTransaction) {
        return this.queue.insertBatch(parse(list), (SqlConnection) consumerTransaction.getDelegate(SqlConnection.class)).replaceWithVoid().onFailure().transform(ProducerExeception::new);
    }

    public <T> Uni<Void> publish(List<TopicMessage<T>> list) {
        return this.queue.insertBatch(parse(list)).replaceWithVoid().onFailure().transform(ProducerExeception::new);
    }

    public Uni<Void> cancel(MessageID messageID) {
        LOGGER.warn("Cancelling message -> {}", messageID.id());
        return this.queue.deleteByKey(new MessageRecordKey(messageID.id()));
    }

    private static <T> List<MessageRecord> parse(List<TopicMessage<T>> list) {
        return list.stream().map(PgClientTopicProducer::parse).toList();
    }

    private static <T> MessageRecord parse(TopicMessage<T> topicMessage) {
        String messageId = topicMessage.messageId();
        MessageState messageState = MessageState.PUBLISHED;
        String address = topicMessage.address();
        T payload = topicMessage.payload();
        return new MessageRecord(messageId, null, null, null, messageState, address, payload instanceof JsonObject ? (JsonObject) payload : JsonObject.mapFrom(topicMessage.payload()), null, null, PartitionHashRing.resolve(topicMessage.partitionKey()), topicMessage.partitionKey(), topicMessage.schemaVersion(), BaseRecord.newRecord());
    }

    private static <T> void log(TopicMessage<T> topicMessage) {
        LOGGER.debug("Publishing message -> {}", topicMessage);
    }
}
