package io.es4j.infrastructure.pgbroker.core;

import io.es4j.infrastructure.pgbroker.exceptions.PartitionTakenException;
import io.es4j.infrastructure.pgbroker.models.BrokerConfiguration;
import io.es4j.infrastructure.pgbroker.models.BrokerPartitionRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecordKey;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQuery;
import io.es4j.infrastructure.pgbroker.models.PartitionKey;
import io.es4j.infrastructure.pgbroker.models.PartitionQuery;
import io.es4j.sql.Repository;
import io.es4j.sql.exceptions.NotFound;
import io.es4j.task.LockLevel;
import io.es4j.task.TimerTask;
import io.es4j.task.TimerTaskConfiguration;
import io.es4j.task.TimerTaskDeployer;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/core/TopicPartitionPollingSession.class */
public class TopicPartitionPollingSession {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopicPartitionPollingSession.class);
    public static final String POLLING_STATEMENT = "update message_broker set state = 'CONSUMING', verticle_id = #{deploymentId} where message_id in ( select message_id from message_broker where  state in ('PUBLISHED', 'STUCK')  and partition_id = #{partitionId}  order by message_sequence for update skip locked limit #{brokerBatchingSize} ) returning *;";
    public static final String CLAIM_PARTITION_STATEMENT = " update message_broker_partition set updated = now(), locked = true, verticle_id = #{verticleId} where partition_id = #{partitionId} and (locked = false or updated + interval '1 minute' <= now() )  returning *";
    public static final String HEART_BEAT_STATEMENT = "update message_broker_partition set updated = now() where partition_id = #{partitionId} and verticle_id = #{verticleId} and locked = true returning *";
    private final Repository<PartitionKey, BrokerPartitionRecord, PartitionQuery> partitionRepository;
    private final Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> messageQueue;
    private final BrokerConfiguration configuration;
    private final String verticleId;
    private final String partitionId;
    public BrokerPartitionRecord brokerPartitionRecord;
    private final ConcurrentLinkedQueue<MessageRecord> messages = new ConcurrentLinkedQueue<>();
    private final AtomicBoolean claimMessages = new AtomicBoolean(false);
    private final AtomicBoolean partitionActive = new AtomicBoolean(true);

    public TopicPartitionPollingSession(Repository<PartitionKey, BrokerPartitionRecord, PartitionQuery> repository, Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> repository2, BrokerConfiguration brokerConfiguration, String str, String str2) {
        this.partitionRepository = repository;
        this.messageQueue = repository2;
        this.configuration = brokerConfiguration;
        this.verticleId = str;
        this.partitionId = str2;
    }

    public Uni<Void> start(TimerTaskDeployer timerTaskDeployer) {
        return takePartition(this.partitionId, this.verticleId).onFailure(PartitionTakenException.class).retry().withBackOff(Duration.ofSeconds(30L)).atMost(1L).flatMap(brokerPartitionRecord -> {
            this.brokerPartitionRecord = brokerPartitionRecord;
            timerTaskDeployer.deploy(heartBeatTimer());
            timerTaskDeployer.deploy(pollingTimer());
            this.claimMessages.set(true);
            return Uni.createFrom().voidItem();
        });
    }

    public TimerTask pollingTimer() {
        return new TimerTask() { // from class: io.es4j.infrastructure.pgbroker.core.TopicPartitionPollingSession.1
            public Uni<Void> performTask() {
                if (!TopicPartitionPollingSession.this.claimMessages.get()) {
                    return Uni.createFrom().voidItem();
                }
                TopicPartitionPollingSession.this.claimMessages.set(false);
                TopicPartitionPollingSession.LOGGER.info("Something to claim on partition {}", TopicPartitionPollingSession.this.brokerPartitionRecord);
                if (TopicPartitionPollingSession.this.partitionActive.get()) {
                    return TopicPartitionPollingSession.this.claimMessages(TopicPartitionPollingSession.this.partitionId, TopicPartitionPollingSession.this.verticleId, TopicPartitionPollingSession.this.configuration).onFailure().recoverWithNull().replaceWithVoid();
                }
                throw new PartitionTakenException();
            }

            public TimerTaskConfiguration configuration() {
                return new TimerTaskConfiguration(LockLevel.NONE, Duration.ofMillis(25L), Duration.ofMillis(25L), Duration.ofMillis(25L), Optional.of(PartitionTakenException.class));
            }
        };
    }

    public TimerTask heartBeatTimer() {
        return new TimerTask() { // from class: io.es4j.infrastructure.pgbroker.core.TopicPartitionPollingSession.2
            public Uni<Void> performTask() {
                return TopicPartitionPollingSession.this.partitionRepository.query(TopicPartitionPollingSession.HEART_BEAT_STATEMENT, Map.of("partitionId", TopicPartitionPollingSession.this.partitionId, "verticleId", TopicPartitionPollingSession.this.verticleId)).onFailure().transform(Unchecked.function(th -> {
                    TopicPartitionPollingSession.LOGGER.error("Lost partition, heartbeat interrupted");
                    TopicPartitionPollingSession.this.partitionActive.set(false);
                    throw new PartitionTakenException();
                })).replaceWithVoid();
            }

            public TimerTaskConfiguration configuration() {
                return new TimerTaskConfiguration(LockLevel.NONE, Duration.ofSeconds(30L), Duration.ofSeconds(30L), Duration.ofSeconds(30L), Optional.of(PartitionTakenException.class));
            }
        };
    }

    public Uni<Void> close() {
        LOGGER.info("Closing polling session for {}", this.brokerPartitionRecord);
        this.partitionActive.set(false);
        return this.partitionRepository.updateByKey(this.brokerPartitionRecord.release()).replaceWithVoid();
    }

    public void poll() {
        this.claimMessages.set(true);
    }

    public List<MessageRecord> flush() {
        ArrayList arrayList = new ArrayList();
        while (true) {
            MessageRecord poll = this.messages.poll();
            if (poll == null) {
                return arrayList;
            }
            arrayList.add(poll);
        }
    }

    private Uni<Void> claimMessages(String str, String str2, BrokerConfiguration brokerConfiguration) {
        return this.messageQueue.query(POLLING_STATEMENT, Map.of("partitionId", str, "deploymentId", str2, "brokerBatchingSize", brokerConfiguration.brokerBatchingSize())).onFailure(NotFound.class).recoverWithItem(Collections::emptyList).map(list -> {
            LOGGER.info("Claimed {} messages from partitionId {}", Integer.valueOf(list.size()), str);
            this.messages.addAll(list);
            return list;
        }).replaceWithVoid();
    }

    public Uni<BrokerPartitionRecord> takePartition(String str, String str2) {
        return this.partitionRepository.query(CLAIM_PARTITION_STATEMENT, Map.of("verticleId", str2, "partitionId", str)).onFailure().transform(Unchecked.function(th -> {
            if (th instanceof NotFound) {
                LOGGER.info("Partition not available {}", str);
            } else {
                LOGGER.info("Partition claiming dropped {} ", str, th);
            }
            throw new PartitionTakenException();
        })).map(list -> {
            return (BrokerPartitionRecord) list.stream().findFirst().orElseThrow();
        });
    }
}
