package org.noear.water.protocol.solution;

import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.IndexOptions;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import org.bson.Document;
import org.noear.solon.Utils;
import org.noear.water.log.Logger;
import org.noear.water.protocol.IdBuilder;
import org.noear.water.protocol.MessageSource;
import org.noear.water.protocol.ProtocolHub;
import org.noear.water.protocol.model.message.DistributionModel;
import org.noear.water.protocol.model.message.MessageModel;
import org.noear.water.protocol.model.message.MessageState;
import org.noear.water.protocol.model.message.SubscriberModel;
import org.noear.water.utils.Datetime;
import org.noear.water.utils.DisttimeUtils;
import org.noear.water.utils.SnowflakeUtils;
import org.noear.water.utils.StringUtils;
import org.noear.water.utils.TextUtils;
import org.noear.weed.cache.ICacheServiceEx;
import org.noear.weed.mongo.MgContext;
import org.noear.weed.mongo.MgTableQuery;

/* loaded from: input_file:org/noear/water/protocol/solution/MessageSourceMongo.class */
public class MessageSourceMongo implements MessageSource {
    MgContext _db;
    ICacheServiceEx _cache;
    Logger _logMsg;
    static final String COLL = "water_msg_message";

    public MessageSourceMongo(MgContext mgContext, ICacheServiceEx iCacheServiceEx, Logger logger) {
        this._db = mgContext;
        this._cache = iCacheServiceEx;
        this._logMsg = logger;
    }

    public boolean hasMessage(String str) throws Exception {
        if (TextUtils.isEmpty(str)) {
            return false;
        }
        return this._db.table(COLL).whereEq("msg_key", str).caching(this._cache).selectExists();
    }

    @Override // org.noear.water.protocol.MessageSource
    public void setMessageAsCancel(String str) throws Exception {
        Datetime Now = Datetime.Now();
        this._db.table(COLL).set("state", -1).set("last_date", Integer.valueOf(Now.getDate())).set("last_fulltime", Now.getFulltime()).whereEq("msg_key", str).update();
        this._db.table("water_msg_distribution").set("msg_state", -1).whereEq("msg_key", str).update();
    }

    @Override // org.noear.water.protocol.MessageSource
    public void setMessageAsSucceed(String str) throws Exception {
        Datetime Now = Datetime.Now();
        this._db.table(COLL).set("state", 2).set("last_date", Integer.valueOf(Now.getDate())).set("last_fulltime", Now.getFulltime()).whereEq("msg_key", str).update();
        this._db.table("water_msg_distribution").set("msg_state", 2).whereEq("msg_key", str).update();
    }

    @Override // org.noear.water.protocol.MessageSource
    public void setDistributionAsCancel(String str, String str2) throws Exception {
        this._db.table("water_msg_distribution").set("state", -1).whereEq("msg_key", str).andEq("subscriber_key", str2).update();
    }

    @Override // org.noear.water.protocol.MessageSource
    public void setDistributionAsSucceed(String str, String str2) throws Exception {
        this._db.table("water_msg_distribution").set("state", 2).whereEq("msg_key", str).andEq("subscriber_key", str2).update();
    }

    @Override // org.noear.water.protocol.MessageSource
    public long addMessage(int i, String str, String str2) throws Exception {
        return addMessage(null, null, null, i, str, str2, null, false);
    }

    @Override // org.noear.water.protocol.MessageSource
    public long addMessage(String str, String str2, String str3, int i, String str4, String str5, Date date, boolean z) throws Exception {
        long genId = SnowflakeUtils.genId();
        if (Utils.isEmpty(str)) {
            str = Utils.guid();
        }
        if (Utils.isEmpty(str2)) {
            str2 = Utils.guid();
        }
        if (str3 == null) {
            str3 = "";
        }
        Datetime datetime = new Datetime();
        int date2 = datetime.getDate();
        long j = 0;
        if (date != null) {
            j = DisttimeUtils.distTime(date);
        } else if (z) {
            j = DisttimeUtils.nextTime(0);
        }
        this._db.table(COLL).set("_id", Long.valueOf(genId)).set("msg_id", Long.valueOf(genId)).set("msg_key", str).set("trace_id", str2).set("tags", str3).set("topic_id", Integer.valueOf(i)).set("topic_name", str4).set("content", str5).set("receive_url", "").set("receive_check", "").set("dist_routed", false).set("dist_count", 0).set("plan_time", date).set("state", 0).set("log_date", Integer.valueOf(date2)).set("log_fulltime", datetime.getFulltime()).set("last_date", Integer.valueOf(date2)).set("last_fulltime", datetime.getFulltime()).set("dist_nexttime", Long.valueOf(j)).insert();
        return genId;
    }

    @Override // org.noear.water.protocol.MessageSource
    public List<MessageModel> getMessageListOfPending(int i, long j) throws Exception {
        return this._db.table(COLL).whereLt("dist_nexttime", Long.valueOf(j)).andEq("state", 0).orderByAsc("_id").limit(i).selectList(MessageModel.class);
    }

    @Override // org.noear.water.protocol.MessageSource
    public void setMessageRouteState(MessageModel messageModel, boolean z) {
        try {
            this._db.table(COLL).set("dist_routed", Boolean.valueOf(z)).whereEq("_id", messageModel.msg_id).update();
            messageModel.dist_routed = z;
        } catch (Exception e) {
            this._logMsg.error("", messageModel.msg_id + "", "setMessageRouteState", messageModel.msg_id + "", e);
        }
    }

    @Override // org.noear.water.protocol.MessageSource
    public boolean setMessageState(MessageModel messageModel, MessageState messageState) {
        return setMessageState(messageModel, messageState, 0L);
    }

    @Override // org.noear.water.protocol.MessageSource
    public boolean setMessageState(MessageModel messageModel, MessageState messageState, long j) {
        try {
            Datetime Now = Datetime.Now();
            this._db.table(COLL).set("state", Integer.valueOf(messageState.code)).set("last_date", Integer.valueOf(Now.getDate())).set("last_fulltime", Now.getFulltime()).build(mgTableQuery -> {
                if (messageState == MessageState.undefined) {
                    mgTableQuery.set("dist_nexttime", Long.valueOf(DisttimeUtils.nextTime(1)));
                }
                if (j > 0) {
                    mgTableQuery.set("dist_nexttime", Long.valueOf(j));
                }
            }).whereEq("_id", messageModel.msg_id).andGte("state", 0).andLte("state", 1).update();
            this._db.table("water_msg_distribution").set("msg_state", Integer.valueOf(messageState.code)).whereEq("msg_id", messageModel.msg_id).update();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            this._logMsg.error(messageModel.topic_name, messageModel.msg_id + "", "setMessageState", "", e);
            return false;
        }
    }

    @Override // org.noear.water.protocol.MessageSource
    public boolean setMessageRepet(MessageModel messageModel, MessageState messageState) {
        try {
            messageModel.dist_count++;
            long nextTime = DisttimeUtils.nextTime(messageModel.dist_count);
            Datetime Now = Datetime.Now();
            this._db.table(COLL).set("state", Integer.valueOf(messageState.code)).set("last_date", Integer.valueOf(Now.getDate())).set("last_fulltime", Now.getFulltime()).set("dist_nexttime", Long.valueOf(nextTime)).set("dist_count", Integer.valueOf(messageModel.dist_count)).whereEq("_id", messageModel.msg_id).andIn("state", Arrays.asList(0, 1)).update();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            this._logMsg.error(messageModel.topic_name, messageModel.msg_id + "", "setMessageRepet", messageModel.msg_id + "", e);
            return false;
        }
    }

    @Override // org.noear.water.protocol.MessageSource
    public void addDistributionNoLock(MessageModel messageModel, SubscriberModel subscriberModel) throws Exception {
        if (this._db.table("water_msg_distribution").whereEq("msg_id", messageModel.msg_id).andEq("subscriber_id", Integer.valueOf(subscriberModel.subscriber_id)).selectExists()) {
            return;
        }
        Datetime datetime = new Datetime();
        this._db.table("water_msg_distribution").set("dist_id", Long.valueOf(ProtocolHub.idBuilder.getId("water_msg_distribution"))).set("msg_id", messageModel.msg_id).set("msg_key", messageModel.msg_key).set("subscriber_id", Integer.valueOf(subscriberModel.subscriber_id)).set("subscriber_key", subscriberModel.subscriber_key).set("alarm_mobile", subscriberModel.alarm_mobile).set("alarm_sign", subscriberModel.alarm_sign).set("receive_url", subscriberModel.receive_url).set("receive_key", subscriberModel.receive_key).set("receive_way", Integer.valueOf(subscriberModel.receive_way)).set("receive_check", "").set("duration", 0).set("state", 0).set("msg_state", 0).set("log_date", Integer.valueOf(datetime.getDate())).set("log_fulltime", datetime.getFulltime()).whereEq("msg_id", messageModel.msg_id).andEq("subscriber_id", Integer.valueOf(subscriberModel.subscriber_id)).insert();
    }

    @Override // org.noear.water.protocol.MessageSource
    public List<DistributionModel> getDistributionListByMsg(long j) throws Exception {
        return this._db.table("water_msg_distribution").whereEq("msg_id", Long.valueOf(j)).andIn("state", Arrays.asList(0, 1)).caching(this._cache).usingCache(60).selectList(DistributionModel.class);
    }

    @Override // org.noear.water.protocol.MessageSource
    public boolean setDistributionState(MessageModel messageModel, DistributionModel distributionModel, MessageState messageState) {
        try {
            this._db.table("water_msg_distribution").set("state", Integer.valueOf(messageState.code)).set("duration", Long.valueOf(distributionModel._duration)).whereEq("msg_id", messageModel.msg_id).andEq("subscriber_id", Integer.valueOf(distributionModel.subscriber_id)).andNeq("state", 2).update();
            return true;
        } catch (Throwable th) {
            this._logMsg.error(messageModel.topic_name, messageModel.msg_id + "", "setDistributionState", "", th);
            return false;
        }
    }

    @Override // org.noear.water.protocol.MessageSource
    public MessageModel getMessageByKey(String str) throws Exception {
        return TextUtils.isEmpty(str) ? new MessageModel() : (MessageModel) this._db.table(COLL).build(mgTableQuery -> {
            if (IdBuilder.isNumeric(str)) {
                mgTableQuery.whereEq("_id", Long.valueOf(Long.parseLong(str)));
            } else {
                mgTableQuery.whereEq("msg_key", str);
            }
        }).selectItem(MessageModel.class);
    }

    @Override // org.noear.water.protocol.MessageSource
    public MessageModel getMessageById(long j) throws Exception {
        return (MessageModel) this._db.table(COLL).whereEq("_id", Long.valueOf(j)).limit(1).selectItem(MessageModel.class);
    }

    @Override // org.noear.water.protocol.MessageSource
    public List<MessageModel> getMessageList(int i, int i2) throws Exception {
        return (i == 0 && i2 == 0) ? new ArrayList() : this._db.table(COLL).build(mgTableQuery -> {
            if (i > 0) {
                mgTableQuery.whereGte("dist_count", Integer.valueOf(i));
            } else {
                mgTableQuery.whereEq("topic_id", Integer.valueOf(i2));
            }
            mgTableQuery.andEq("state", 0);
        }).orderByAsc("_id").limit(50).selectList(MessageModel.class);
    }

    @Override // org.noear.water.protocol.MessageSource
    public List<MessageModel> getMessageList(int i, String str) throws Exception {
        MgTableQuery table = this._db.table(COLL);
        if (i == 0) {
            table.whereEq("state", Integer.valueOf(MessageState.undefined.code)).andGte("dist_count", 2);
        } else if (i == 1) {
            table.whereEq("state", Integer.valueOf(MessageState.undefined.code));
        } else if (i == 2) {
            table.whereEq("state", Integer.valueOf(MessageState.processed.code));
        } else if (i == 3) {
            table.whereIn("state", Arrays.asList(Integer.valueOf(MessageState.completed.code), Integer.valueOf(MessageState.excessive.code)));
        } else {
            table.whereLt("state", 0);
        }
        if (str != null) {
            String trim = str.trim();
            if (trim.startsWith("*")) {
                table.andEq("trace_id", trim.substring(1).trim());
            } else if (trim.startsWith("@")) {
                table.andEq("tags", trim.substring(1).trim());
            } else if (StringUtils.isNumeric(trim)) {
                table.andEq("_id", Long.valueOf(Long.parseLong(trim)));
            } else {
                table.andEq("topic_name", trim);
            }
        }
        return table.orderByDesc("_id").limit(50).selectList(MessageModel.class);
    }

    @Override // org.noear.water.protocol.MessageSource
    public boolean setMessageAsPending(List<Object> list) throws Exception {
        Datetime Now = Datetime.Now();
        return this._db.table(COLL).whereIn("_id", list).andNeq("state", 2).set("state", 0).set("last_date", Integer.valueOf(Now.getDate())).set("last_fulltime", Now.getFulltime()).set("dist_nexttime", 0).update() > 0;
    }

    @Override // org.noear.water.protocol.MessageSource
    public List<DistributionModel> getDistributionListByMsgIds(List<Object> list) throws Exception {
        return this._db.table("water_msg_distribution").whereIn("msg_id", list).selectList(DistributionModel.class);
    }

    @Override // org.noear.water.protocol.MessageSource
    public boolean setDistributionReceiveUrl(long j, String str) throws Exception {
        return this._db.table("water_msg_distribution").whereEq("dist_id", Long.valueOf(j)).set("receive_url", str).update() > 0;
    }

    @Override // org.noear.water.protocol.MessageSource
    public boolean setMessageAsCancel(List<Object> list) throws Exception {
        Datetime Now = Datetime.Now();
        return this._db.table(COLL).whereIn("_id", list).set("state", -1).set("last_date", Integer.valueOf(Now.getDate())).set("last_fulltime", Now.getFulltime()).update() > 0;
    }

    private void initIndex() {
        IndexOptions indexOptions = new IndexOptions();
        indexOptions.background(true);
        indexOptions.unique(true);
        this._db.table(COLL).orderByDesc("msg_key").createIndex(indexOptions);
        this._db.table(COLL).orderByDesc("msg_id").createIndex(indexOptions);
        this._db.table(COLL).orderByDesc("topic_id").createIndex(true);
        this._db.table(COLL).orderByDesc("state").createIndex(true);
        this._db.table(COLL).orderByDesc("dist_count").createIndex(true);
        this._db.table(COLL).orderByDesc("log_date").createIndex(true);
        this._db.table(COLL).orderByDesc("dist_nexttime").createIndex(true);
        this._db.table(COLL).orderByDesc("topic_name").createIndex(true);
        this._db.table(COLL).orderByDesc("tags").createIndex(true);
        this._db.table(COLL).orderByDesc("trace_id").createIndex(true);
        this._db.table(COLL).orderByDesc("last_date").createIndex(true);
        this._db.table("water_msg_message_all").orderByDesc("trace_id").createIndex(true);
        this._db.table("water_msg_message_all").orderByDesc("log_date").createIndex(true);
        this._db.table("water_msg_message_all").orderByDesc("last_date").createIndex(true);
        this._db.table("water_msg_message_all").orderByDesc("topic_name").createIndex(true);
        this._db.table("water_msg_distribution").orderByDesc("dist_id").createIndex(indexOptions);
        this._db.table("water_msg_distribution").orderByDesc("log_date").createIndex(true);
        this._db.table("water_msg_distribution").orderByDesc("state").createIndex(true);
        this._db.table("water_msg_distribution").orderByDesc("msg_id").createIndex(true);
        this._db.table("water_msg_distribution").orderByDesc("msg_key").createIndex(true);
        this._db.table("water_msg_distribution").orderByDesc("msg_state").createIndex(true);
    }

    @Override // org.noear.water.protocol.MessageSource
    public void clear(int i) throws Exception {
        this._db.table(COLL).whereLte("last_date", Integer.valueOf(i)).andEq("state", 2).delete();
        this._db.table(COLL).whereLte("last_date", Integer.valueOf(i)).andEq("state", 3).delete();
        this._db.table(COLL).whereLte("last_date", Integer.valueOf(i)).andEq("state", -1).delete();
        this._db.table(COLL).whereLte("last_date", Integer.valueOf(i)).andEq("state", -2).delete();
        this._db.table("water_msg_distribution").whereLte("log_date", Integer.valueOf(i)).andEq("msg_state", 2).delete();
        this._db.table("water_msg_distribution").whereLte("log_date", Integer.valueOf(i)).andEq("msg_state", 3).delete();
        this._db.table("water_msg_distribution").whereLte("log_date", Integer.valueOf(i)).andEq("msg_state", -1).delete();
        this._db.table("water_msg_distribution").whereLte("log_date", Integer.valueOf(i)).andEq("msg_state", -2).delete();
    }

    @Override // org.noear.water.protocol.MessageSource
    public long reset(int i) throws SQLException {
        if (i < 10) {
            i = 30;
        }
        initIndex();
        long currentTimeMillis = System.currentTimeMillis() - (1000 * i);
        Datetime Now = Datetime.Now();
        if (this._db.table(COLL).whereEq("state", 1).andLt("dist_nexttime", Long.valueOf(currentTimeMillis)).selectExists()) {
            return this._db.table(COLL).set("state", 0).set("last_date", Integer.valueOf(Now.getDate())).set("last_fulltime", Now.getFulltime()).whereEq("state", 1).andLt("dist_nexttime", Long.valueOf(currentTimeMillis)).update();
        }
        return 0L;
    }

    @Override // org.noear.water.protocol.MessageSource
    public void persistence(int i, int i2) throws Exception {
        if (!this._db.table("water_msg_message_all").whereEq("last_date", Integer.valueOf(i)).selectExists()) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("last_date", Integer.valueOf(i));
            FindIterable find = this._db.mongo().find(COLL, linkedHashMap);
            ArrayList arrayList = new ArrayList(2000);
            MongoCursor it = find.iterator();
            while (it.hasNext()) {
                arrayList.add((Document) it.next());
                if (arrayList.size() == 2000) {
                    this._db.mongo().insertMany("water_msg_message_all", arrayList);
                    arrayList.clear();
                }
            }
            if (arrayList.size() > 0) {
                this._db.mongo().insertMany("water_msg_message_all", arrayList);
                arrayList.clear();
            }
        }
        this._db.table("water_msg_message_all").whereLte("last_date", Integer.valueOf(i2));
        this._db.table("water_msg_message_ex_stat").whereLte("log_date", Integer.valueOf(i2));
    }
}
