package io.es4j.infrastructure.pgbroker.vertx;

import io.es4j.infrastructure.pgbroker.exceptions.ProducerExeception;
import io.es4j.infrastructure.pgbroker.mappers.MessageQueueMapper;
import io.es4j.infrastructure.pgbroker.messagebroker.PartitionHashRing;
import io.es4j.infrastructure.pgbroker.models.ConsumerTransaction;
import io.es4j.infrastructure.pgbroker.models.Message;
import io.es4j.infrastructure.pgbroker.models.MessageID;
import io.es4j.infrastructure.pgbroker.models.MessageRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecordID;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQuery;
import io.es4j.infrastructure.pgbroker.models.MessageState;
import io.es4j.sql.Repository;
import io.es4j.sql.RepositoryHandler;
import io.es4j.sql.models.BaseRecord;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.sqlclient.SqlConnection;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/vertx/VertxMessageProducer.class */
public class VertxMessageProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(VertxMessageProducer.class);
    private final Repository<MessageRecordID, MessageRecord, MessageRecordQuery> queue;

    public VertxMessageProducer(RepositoryHandler repositoryHandler) {
        this.queue = new Repository<>(MessageQueueMapper.INSTANCE, repositoryHandler);
    }

    public <T> Uni<Void> enqueue(Message<T> message, ConsumerTransaction consumerTransaction) {
        log(message);
        return this.queue.insert(new MessageRecord(message.messageId(), message.scheduled(), message.expiration(), message.priority(), 0, MessageState.CREATED, message.payload().getClass().getName(), JsonObject.mapFrom(message.payload()), null, null, null, PartitionHashRing.resolve(message.partitionKey()), message.partitionKey(), BaseRecord.newRecord(message.tenant())), (SqlConnection) consumerTransaction.connection()).replaceWithVoid().onFailure().transform(ProducerExeception::new);
    }

    private static <T> void log(Message<T> message) {
        LOGGER.debug("Enqueuing {}", message);
    }

    public <T> Uni<Void> enqueue(Message<T> message) {
        MessageRecord messageRecord = new MessageRecord(message.messageId(), message.scheduled(), message.expiration(), message.priority(), 0, MessageState.CREATED, message.payload().getClass().getName(), JsonObject.mapFrom(message.payload()), null, null, null, PartitionHashRing.resolve(message.partitionKey()), message.partitionKey(), BaseRecord.newRecord(message.tenant()));
        log(message);
        return this.queue.insert(messageRecord).replaceWithVoid().onFailure().transform(ProducerExeception::new);
    }

    public <T> Uni<Void> enqueue(List<Message<T>> list, ConsumerTransaction consumerTransaction) {
        return this.queue.insertBatch(list.stream().map(message -> {
            log(message);
            return new MessageRecord(message.messageId(), message.scheduled(), message.expiration(), message.priority(), 0, MessageState.CREATED, message.payload().getClass().getName(), JsonObject.mapFrom(message.payload()), null, null, null, PartitionHashRing.resolve(message.partitionKey()), message.partitionKey(), BaseRecord.newRecord(message.tenant()));
        }).toList(), (SqlConnection) consumerTransaction.getDelegate(SqlConnection.class)).replaceWithVoid().onFailure().transform(ProducerExeception::new);
    }

    public <T> Uni<Void> enqueue(List<Message<T>> list) {
        return this.queue.insertBatch(list.stream().map(message -> {
            log(message);
            return new MessageRecord(message.messageId(), message.scheduled(), message.expiration(), message.priority(), 0, MessageState.CREATED, message.payload().getClass().getName(), JsonObject.mapFrom(message.payload()), null, null, null, PartitionHashRing.resolve(message.partitionKey()), message.partitionKey(), BaseRecord.newRecord(message.tenant()));
        }).toList()).replaceWithVoid().onFailure().transform(ProducerExeception::new);
    }

    public Uni<Void> cancel(MessageID messageID) {
        LOGGER.info("Cancelling message {}", messageID.id());
        return this.queue.deleteByKey(new MessageRecordID(messageID.id(), messageID.tenant()));
    }

    public <T> Uni<Message<T>> get(MessageID messageID, Class<T> cls) {
        return this.queue.selectByKey(new MessageRecordID(messageID.id(), messageID.tenant())).map(messageRecord -> {
            return new Message(messageRecord.id(), messageRecord.baseRecord().tenant(), messageRecord.partitionId(), messageRecord.scheduled(), messageRecord.expiration(), messageRecord.priority(), messageRecord.payload().mapTo(cls));
        });
    }

    public <T> Uni<List<Message<T>>> query(MessageRecordQuery messageRecordQuery, Class<T> cls) {
        return this.queue.query(messageRecordQuery).map(list -> {
            return list.stream().map(messageRecord -> {
                return new Message(messageRecord.id(), messageRecord.baseRecord().tenant(), messageRecord.partitionId(), messageRecord.scheduled(), messageRecord.expiration(), messageRecord.priority(), messageRecord.payload().mapTo(cls));
            }).toList();
        });
    }
}
