package cn.weforward.data.mongodb.util;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/weforward/data/mongodb/util/AbstractMongodbChangeSupport.class */
public abstract class AbstractMongodbChangeSupport implements Runnable {
    private Thread m_Thread;
    private int m_ErrorNum;
    private MongoDatabase m_Db;
    private MongoCursor<ChangeStreamDocument<Document>> m_Cursor;
    private static final Logger _Logger = LoggerFactory.getLogger(AbstractMongodbChangeSupport.class);
    private static final AtomicInteger INC = new AtomicInteger();
    private long m_MaxAwaitTime = 60000;
    private long m_LastActivity = System.currentTimeMillis();

    public AbstractMongodbChangeSupport(MongoDatabase mongoDatabase) {
        this.m_Db = mongoDatabase;
    }

    public void setMaxAwaitTime(long j) {
        this.m_MaxAwaitTime = j;
    }

    public void stop() {
        if (this.m_Thread != null) {
            _Logger.info("stop " + this.m_Thread.getName());
            this.m_Thread = null;
        }
        MongoCursor<ChangeStreamDocument<Document>> mongoCursor = this.m_Cursor;
        if (mongoCursor != null) {
            try {
                mongoCursor.close();
                this.m_Cursor = null;
            } catch (Throwable th) {
                _Logger.warn("忽略关闭异常", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void start() {
        if (this.m_Thread != null) {
            return;
        }
        this.m_Thread = new Thread(this, "mongodbwatcher-" + this.m_Db.getName() + "-" + INC.incrementAndGet());
        this.m_Thread.setDaemon(true);
        this.m_Thread.start();
        MongodbWatcherChecker.checkMe(this);
        _Logger.info("start " + this.m_Thread.getName());
    }

    public void restart() {
        if (this.m_Thread != null) {
            this.m_Thread.interrupt();
        }
    }

    public boolean isDead() {
        return (this.m_Thread == null || this.m_Cursor == null || System.currentTimeMillis() - this.m_LastActivity <= this.m_MaxAwaitTime + 10000) ? false : true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10 */
    /* JADX WARN: Type inference failed for: r0v11, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v12, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v3 */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v9 */
    @Override // java.lang.Runnable
    public void run() {
        while (this.m_Thread != null) {
            ?? r0 = this;
            synchronized (r0) {
                try {
                    r0 = this;
                    r0.wait(3000L);
                } catch (InterruptedException e) {
                    r0 = r0;
                }
            }
            try {
                doLoop();
            } catch (InterruptedException e2) {
            } catch (Throwable th) {
                Logger logger = _Logger;
                StringBuilder sb = new StringBuilder("监控程序异常,");
                int i = this.m_ErrorNum + 1;
                this.m_ErrorNum = i;
                logger.error(sb.append(i).append("秒后重试").toString(), th);
                ?? r02 = this;
                synchronized (r02) {
                    try {
                        r02 = this;
                        r02.wait(this.m_ErrorNum * MongodbUtil.MONGOCLIENT_DEFAULT_CONNECTTIMEOUT_MS);
                    } catch (InterruptedException e3) {
                        r02 = r02;
                    }
                }
            }
        }
        this.m_Thread = null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v48 */
    /* JADX WARN: Type inference failed for: r0v49, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v51 */
    private void doLoop() throws InterruptedException, IOException {
        try {
            try {
                try {
                    ChangeStreamIterable watch = this.m_Db.watch();
                    long j = this.m_MaxAwaitTime;
                    this.m_Cursor = watch.maxAwaitTime(j, TimeUnit.MILLISECONDS).fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
                    if (_Logger.isTraceEnabled()) {
                        _Logger.trace("启动监听，MaxAwaitTime:" + (j / 1000.0d));
                    }
                    while (this.m_Thread != null) {
                        this.m_LastActivity = System.currentTimeMillis();
                        try {
                            ChangeStreamDocument<Document> changeStreamDocument = (ChangeStreamDocument) this.m_Cursor.tryNext();
                            if (changeStreamDocument != null) {
                                if (_Logger.isTraceEnabled()) {
                                    _Logger.trace(changeStreamDocument.getNamespaceDocument() + "," + ((Document) changeStreamDocument.getFullDocument()).getString("_id"));
                                }
                                try {
                                    onChange(changeStreamDocument);
                                } catch (Throwable th) {
                                    _Logger.error("变化通知异常", th);
                                }
                            } else {
                                if (_Logger.isTraceEnabled()) {
                                    _Logger.trace(String.valueOf(j / 1000.0d) + "s无变化");
                                }
                                ?? r0 = this;
                                synchronized (r0) {
                                    wait(10L);
                                    r0 = r0;
                                }
                            }
                        } catch (MongoInterruptedException e) {
                            _Logger.info("线程中断,重新获取", e);
                        }
                    }
                    if (this.m_Cursor != null) {
                        try {
                            this.m_Cursor.close();
                            this.m_Cursor = null;
                        } catch (Throwable th2) {
                            _Logger.warn("忽略关闭异常", th2);
                        }
                    }
                } catch (Throwable th3) {
                    if (this.m_Cursor != null) {
                        try {
                            this.m_Cursor.close();
                            this.m_Cursor = null;
                        } catch (Throwable th4) {
                            _Logger.warn("忽略关闭异常", th4);
                        }
                    }
                    throw th3;
                }
            } catch (RuntimeException e2) {
                throw e2;
            }
        } catch (MongoCommandException e3) {
            if (!e3.getMessage().contains("The $changeStream stage is only supported on replica sets")) {
                throw e3;
            }
            _Logger.error("非副本集数据库无法支持变化监听,将导致Reload功能失效");
            throw new InterruptedException("无法支持变化监听,正常中止");
        }
    }

    protected abstract void onChange(ChangeStreamDocument<Document> changeStreamDocument);
}
