package io.es4j.infrastructure.pgbroker.messagebroker;

import io.es4j.infrastructure.pgbroker.exceptions.PartitionTakenException;
import io.es4j.infrastructure.pgbroker.models.MessagePartition;
import io.es4j.infrastructure.pgbroker.models.MessageRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecordID;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQuery;
import io.es4j.infrastructure.pgbroker.models.MessageState;
import io.es4j.infrastructure.pgbroker.models.PartitionKey;
import io.es4j.infrastructure.pgbroker.models.PartitionQuery;
import io.es4j.infrastructure.pgbroker.models.PgBrokerConfiguration;
import io.es4j.sql.Repository;
import io.es4j.sql.exceptions.NotFound;
import io.es4j.task.LockLevel;
import io.es4j.task.TimerTask;
import io.es4j.task.TimerTaskConfiguration;
import io.es4j.task.TimerTaskDeployer;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/messagebroker/PartitionPollingSession.class */
public class PartitionPollingSession {
    private static final Logger LOGGER = LoggerFactory.getLogger(PartitionPollingSession.class);
    private final Repository<PartitionKey, MessagePartition, PartitionQuery> partitionRepository;
    private final Repository<MessageRecordID, MessageRecord, MessageRecordQuery> messageQueue;
    private final PgBrokerConfiguration configuration;
    private final String verticleId;
    private final String partitionId;
    public MessagePartition messagePartition;
    private final ConcurrentLinkedQueue<MessageRecord> messages = new ConcurrentLinkedQueue<>();
    private final AtomicBoolean claimMessages = new AtomicBoolean(false);
    private final AtomicBoolean partitionActive = new AtomicBoolean(true);

    public PartitionPollingSession(Repository<PartitionKey, MessagePartition, PartitionQuery> repository, Repository<MessageRecordID, MessageRecord, MessageRecordQuery> repository2, PgBrokerConfiguration pgBrokerConfiguration, String str, String str2) {
        this.partitionRepository = repository;
        this.messageQueue = repository2;
        this.configuration = pgBrokerConfiguration;
        this.verticleId = str;
        this.partitionId = str2;
    }

    public Uni<Void> start(TimerTaskDeployer timerTaskDeployer) {
        return takePartition(this.partitionId, this.verticleId).onFailure(PartitionTakenException.class).retry().withBackOff(Duration.ofSeconds(30L)).atMost(1L).flatMap(messagePartition -> {
            this.messagePartition = messagePartition;
            timerTaskDeployer.deploy(partitionHeartBeat());
            timerTaskDeployer.deploy(partitionPolling());
            this.claimMessages.set(true);
            return Uni.createFrom().voidItem();
        });
    }

    public TimerTask partitionPolling() {
        return new TimerTask() { // from class: io.es4j.infrastructure.pgbroker.messagebroker.PartitionPollingSession.1
            public Uni<Void> performTask() {
                if (!PartitionPollingSession.this.claimMessages.get()) {
                    PartitionPollingSession.LOGGER.debug("Nothing to claim on partition {}", PartitionPollingSession.this.messagePartition);
                    return Uni.createFrom().voidItem();
                }
                PartitionPollingSession.this.claimMessages.set(false);
                PartitionPollingSession.LOGGER.info("Something to claim on partition {}", PartitionPollingSession.this.messagePartition);
                if (PartitionPollingSession.this.partitionActive.get()) {
                    return PartitionPollingSession.this.pollPartition(PartitionPollingSession.this.partitionId, PartitionPollingSession.this.verticleId, PartitionPollingSession.this.configuration).onFailure().recoverWithNull().replaceWithVoid();
                }
                throw new PartitionTakenException();
            }

            public TimerTaskConfiguration configuration() {
                return new TimerTaskConfiguration(LockLevel.NONE, Duration.ofMillis(25L), Duration.ofMillis(25L), Duration.ofMillis(25L), Optional.of(PartitionTakenException.class));
            }
        };
    }

    public TimerTask partitionHeartBeat() {
        return new TimerTask() { // from class: io.es4j.infrastructure.pgbroker.messagebroker.PartitionPollingSession.2
            public Uni<Void> performTask() {
                return PartitionPollingSession.this.partitionRepository.query(PartitionPollingSession.this.partitionHeartBeatStatement(), Map.of("partitionId", PartitionPollingSession.this.partitionId, "verticleId", PartitionPollingSession.this.verticleId)).onFailure().transform(Unchecked.function(th -> {
                    PartitionPollingSession.LOGGER.error("Lost partition, heartbeat interrupted");
                    PartitionPollingSession.this.partitionActive.set(false);
                    throw new PartitionTakenException();
                })).replaceWithVoid();
            }

            public TimerTaskConfiguration configuration() {
                return new TimerTaskConfiguration(LockLevel.NONE, Duration.ofSeconds(30L), Duration.ofSeconds(30L), Duration.ofSeconds(30L), Optional.of(PartitionTakenException.class));
            }
        };
    }

    public Uni<Void> close() {
        LOGGER.info("Closing polling session for {}", this.messagePartition);
        this.partitionActive.set(false);
        return this.partitionRepository.updateByKey(this.messagePartition.release()).replaceWithVoid();
    }

    public List<MessageRecord> pollMessages() {
        ArrayList arrayList = new ArrayList();
        while (true) {
            MessageRecord poll = this.messages.poll();
            if (poll == null) {
                return arrayList;
            }
            arrayList.add(poll);
        }
    }

    private String pollingStatement() {
        return "update queue set state = 'PROCESSING', verticle_id = #{deploymentId} where message_id in ( select message_id from queue where  state in ('CREATED','SCHEDULED','RETRY') and partition_id = #{partitionId}  and (scheduled is null or scheduled <= current_timestamp ) and (expiration is null or expiration >= current_timestamp ) and (retry_counter = 0 or updated + interval '#{retryInterval} seconds' <= now() )  order by message_sequence for update skip locked limit #{batchSize} ) returning *;";
    }

    private String recoveryStatement() {
        return "update queue set state = 'PROCESSING', verticle_id = #{deploymentId} where message_id in ( select message_id from queue where  state = 'RECOVERY' and partition_id = #{partitionId}  order by message_sequence for update skip locked limit #{batchSize} ) returning *;";
    }

    private Uni<Void> pollPartition(String str, String str2, PgBrokerConfiguration pgBrokerConfiguration) {
        return Uni.join().all(new Uni[]{recoverMessages(str, str2, pgBrokerConfiguration), claimMessages(str, str2, pgBrokerConfiguration)}).andCollectFailures().onFailure().recoverWithNull().replaceWithVoid();
    }

    private Uni<Void> recoverMessages(String str, String str2, PgBrokerConfiguration pgBrokerConfiguration) {
        return recoverPartitionMessages(str, str2, pgBrokerConfiguration).onFailure(NotFound.class).recoverWithItem(Collections::emptyList).map(list -> {
            LOGGER.info("Recovered {} messages from partitionId {}", Integer.valueOf(list.size()), str);
            Stream map = list.stream().map(messageRecord -> {
                return messageRecord.withState(MessageState.RECOVERY);
            });
            ConcurrentLinkedQueue<MessageRecord> concurrentLinkedQueue = this.messages;
            Objects.requireNonNull(concurrentLinkedQueue);
            map.forEach((v1) -> {
                r1.add(v1);
            });
            return list;
        }).replaceWithVoid();
    }

    private Uni<Void> claimMessages(String str, String str2, PgBrokerConfiguration pgBrokerConfiguration) {
        return this.messageQueue.query(pollingStatement(), Map.of("partitionId", str, "deploymentId", str2, "batchSize", pgBrokerConfiguration.batchSize(), "retryInterval", Long.valueOf(((Duration) Objects.requireNonNullElse(pgBrokerConfiguration.retryBackOffInterval(), Duration.ofMinutes(5L))).getSeconds()))).onFailure(NotFound.class).recoverWithItem(Collections::emptyList).map(list -> {
            LOGGER.info("Claimed {} messages from partitionId {}", Integer.valueOf(list.size()), str);
            this.messages.addAll(list);
            return list;
        }).replaceWithVoid();
    }

    private Uni<List<MessageRecord>> recoverPartitionMessages(String str, String str2, PgBrokerConfiguration pgBrokerConfiguration) {
        return this.messageQueue.query(recoveryStatement(), Map.of("partitionId", str, "deploymentId", str2, "batchSize", pgBrokerConfiguration.batchSize()));
    }

    private String claimPartitionStatement() {
        return " update queue_partition set updated = now(), locked = true, verticle_id = #{verticleId} where partition_id = #{partitionId} and (locked = false or updated + interval '1 minute' <= now() )  returning *";
    }

    private String partitionHeartBeatStatement() {
        return "update queue_partition set updated = now() where partition_id = #{partitionId} and verticle_id = #{verticleId} and locked = true returning *";
    }

    public Uni<MessagePartition> takePartition(String str, String str2) {
        return this.partitionRepository.query(claimPartitionStatement(), Map.of("verticleId", str2, "partitionId", str)).onFailure().transform(Unchecked.function(th -> {
            if (th instanceof NotFound) {
                LOGGER.info("Partition not available {}", str);
            } else {
                LOGGER.info("Partition claiming dropped {} ", str, th);
            }
            throw new PartitionTakenException();
        })).map(list -> {
            return (MessagePartition) list.stream().findFirst().orElseThrow();
        });
    }

    public void signalMessage() {
        this.claimMessages.set(true);
    }
}
