package com.github.kancyframework.delay.message.data.mongo.repository;

import com.github.kancyframework.delay.message.message.MessageStatus;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.IndexOptions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.bson.Document;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/github/kancyframework/delay/message/data/mongo/repository/DelayMessageRepository.class */
public class DelayMessageRepository {
    private final MongoTemplate mongoTemplate;
    private static Set<String> createIndexCollectionNames = new HashSet();

    public DelayMessageRepository(MongoTemplate mongoTemplate) {
        this.mongoTemplate = mongoTemplate;
    }

    public void save(String str, DelayMessageEntity delayMessageEntity) {
        this.mongoTemplate.save(delayMessageEntity, str);
    }

    public List<DelayMessageEntity> scanExpiredMessage(String str, long j, int i) {
        createIndex(str);
        return this.mongoTemplate.find(Query.query(Criteria.where("messageStatus").in(new Object[]{Integer.valueOf(MessageStatus.WAITING.ordinal()), Integer.valueOf(MessageStatus.TIMEOUT.ordinal())}).andOperator(new Criteria[]{Criteria.where("expiredTime").gte(new Date(j)).lte(new Date())})).with(Sort.by(Sort.Direction.ASC, new String[]{"expiredTime"})).limit(i), DelayMessageEntity.class, str);
    }

    public void updateStatus(String str, String str2, int i) {
        this.mongoTemplate.updateFirst(Query.query(Criteria.where("_id").is(str2)), Update.update("messageStatus", Integer.valueOf(i)).set("updatedTime", new Date()), str);
    }

    public void batchUpdateStatus(String str, List<String> list, int i, int i2) {
        this.mongoTemplate.updateMulti(Query.query(Criteria.where("_id").in(list).and("messageStatus").is(Integer.valueOf(i))), Update.update("messageStatus", Integer.valueOf(i2)).set("updatedTime", new Date()), str);
    }

    public void batchUpdateOnProcessing(String str, List<String> list) {
        this.mongoTemplate.updateMulti(Query.query(Criteria.where("_id").in(list)), Update.update("messageStatus", Integer.valueOf(MessageStatus.RUNNING.ordinal())).set("updatedTime", new Date()).inc("scanTimes", 1), str);
    }

    public List<String> findAllProcessingMessageIdsWithTimeoutAndTimeRange(String str, Duration duration, long j) {
        Date date = new Date(System.currentTimeMillis() - (j * 1000));
        Date date2 = new Date(System.currentTimeMillis() - duration.toMillis());
        ArrayList arrayList = new ArrayList();
        this.mongoTemplate.executeQuery(Query.query(Criteria.where("expiredTime").gte(date).lte(new Date()).and("messageStatus").is(Integer.valueOf(MessageStatus.RUNNING.ordinal())).and("updatedTime").lt(date2)), str, document -> {
            arrayList.add(document.getString("_id"));
        });
        return arrayList;
    }

    public Date findMinExpireTime(String str, long j) {
        List find = this.mongoTemplate.find(Query.query(Criteria.where("expiredTime").gte(new Date(System.currentTimeMillis() - (j * 1000))).lte(new Date()).and("messageStatus").nin(new Object[]{Integer.valueOf(MessageStatus.SUCCESS.ordinal()), Integer.valueOf(MessageStatus.FAIL.ordinal())})).with(Sort.by(Sort.Direction.ASC, new String[]{"expiredTime"})).limit(1), DelayMessageEntity.class, str);
        if (CollectionUtils.isEmpty(find)) {
            return null;
        }
        return ((DelayMessageEntity) find.get(0)).getExpiredTime();
    }

    private void createIndex(String str) {
        if (createIndexCollectionNames.contains(str)) {
            return;
        }
        MongoCursor it = this.mongoTemplate.getCollection(str).listIndexes().iterator();
        while (it.hasNext()) {
            Object obj = ((Document) it.next()).get("key");
            if (Objects.nonNull(obj) && (obj instanceof Document)) {
                Document document = (Document) obj;
                if (document.containsKey("expiredTime") || document.containsKey("createdTime") || document.containsKey("deletedTime")) {
                    return;
                }
            }
        }
        this.mongoTemplate.getCollection(str).createIndex(new Document("expiredTime", 1), new IndexOptions().background(true).name("idx_expiredTime"));
        this.mongoTemplate.getCollection(str).createIndex(new Document("createdTime", 1), new IndexOptions().background(true).name("idx_createdTime"));
        this.mongoTemplate.getCollection(str).createIndex(new Document("deletedTime", 1), new IndexOptions().background(true).expireAfter(0L, TimeUnit.MILLISECONDS).name("idx_deletedTime"));
        createIndexCollectionNames.add(str);
    }
}
