package io.es4j.infrastructure.pgbroker.core;

import io.es4j.infrastructure.pgbroker.models.BrokerConfiguration;
import io.es4j.infrastructure.pgbroker.models.MessageRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecordKey;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQuery;
import io.es4j.sql.Repository;
import io.es4j.sql.exceptions.NotFound;
import io.es4j.task.TimerTask;
import io.smallrye.mutiny.Uni;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/core/QueuePollingSession.class */
public class QueuePollingSession {
    public static final String POLLING_STATEMENT = "update message_broker set state = 'CONSUMING', verticle_id = #{deploymentId} where message_id in ( select message_id from message_broker where  state in ('PUBLISHED', 'STUCK')  and partition_id = 'none'  and (scheduled is null or scheduled <= current_timestamp) order by priority for update skip locked limit #{brokerBatchingSize}  ) returning *;";
    private static final Logger LOGGER = LoggerFactory.getLogger(QueuePollingSession.class);
    private final Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> messageQueue;
    private final BrokerConfiguration configuration;
    private final String deploymentId;
    private final ConcurrentLinkedQueue<MessageRecord> messages = new ConcurrentLinkedQueue<>();
    private final AtomicBoolean claimMessages = new AtomicBoolean(false);

    public QueuePollingSession(Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> repository, BrokerConfiguration brokerConfiguration, String str) {
        this.messageQueue = repository;
        this.configuration = brokerConfiguration;
        this.deploymentId = str;
    }

    public List<MessageRecord> flush() {
        ArrayList arrayList = new ArrayList();
        while (true) {
            MessageRecord poll = this.messages.poll();
            if (poll == null) {
                return arrayList;
            }
            arrayList.add(poll);
        }
    }

    public TimerTask provideTask() {
        return () -> {
            if (!this.claimMessages.get()) {
                return Uni.createFrom().voidItem();
            }
            this.claimMessages.set(false);
            LOGGER.debug("Checking address");
            return claimMessages(Map.of("deploymentId", Objects.requireNonNullElse(this.deploymentId, UUID.randomUUID().toString()), "brokerBatchingSize", Objects.requireNonNullElse(this.configuration.brokerBatchingSize(), 10))).onFailure().recoverWithNull().replaceWithVoid().eventually(() -> {
                if (this.messages.isEmpty()) {
                    LOGGER.debug("Messages not found");
                } else {
                    LOGGER.info("Found {} messages", Integer.valueOf(this.messages.size()));
                }
            });
        };
    }

    private Uni<Void> claimMessages(Map<String, Object> map) {
        return this.messageQueue.query(POLLING_STATEMENT, map).map(list -> {
            LOGGER.debug("Claimed {} messages", Integer.valueOf(list.size()));
            this.messages.addAll(list);
            return list;
        }).onFailure(NotFound.class).recoverWithNull().replaceWithVoid();
    }

    public void signalMessage() {
        this.claimMessages.set(true);
    }
}
