package kz.greetgo.spring.websocket.mongo;

import com.mongodb.MongoException;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kz.greetgo.spring.websocket.interfaces.NeedClose;
import kz.greetgo.spring.websocket.util.ConsoleColors;
import kz.greetgo.spring.websocket.util.LoggingUtil;
import org.bson.BsonDocument;
import org.slf4j.Logger;

/* loaded from: input_file:kz/greetgo/spring/websocket/mongo/AbstractMongoWatcher.class */
public abstract class AbstractMongoWatcher<DataObject> implements NeedClose {
    public final String sessionId;
    private final Logger callingLog = LoggingUtil.callingLog;
    private final AtomicBoolean opened = new AtomicBoolean(true);
    private final AtomicReference<BsonDocument> resumeToken = new AtomicReference<>(null);
    private final Thread watchingThread = new Thread(this::watch);

    public AbstractMongoWatcher(String str) {
        this.sessionId = str;
    }

    protected void start() {
        this.watchingThread.start();
        if (this.callingLog.isInfoEnabled()) {
            this.callingLog.info(ConsoleColors.CYAN_BOLD() + "WATCH     " + ConsoleColors.RESET() + this.sessionId + ConsoleColors.CYAN_BOLD() + " " + inWatchPlace() + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
        }
    }

    @Override // kz.greetgo.spring.websocket.interfaces.NeedClose
    public void close() {
        this.opened.set(false);
        this.watchingThread.interrupt();
        if (this.callingLog.isInfoEnabled()) {
            this.callingLog.info(ConsoleColors.CYAN_BOLD() + "UNWATCH   " + ConsoleColors.RESET() + this.sessionId + ConsoleColors.CYAN_BOLD() + " " + inWatchPlace() + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
        }
    }

    protected abstract String inWatchPlace();

    protected abstract MongoChangeStreamCursor<ChangeStreamDocument<DataObject>> createCursor(BsonDocument bsonDocument);

    protected abstract void acceptDoc(ChangeStreamDocument<DataObject> changeStreamDocument);

    private void watch() {
        if (this.callingLog.isInfoEnabled()) {
            this.callingLog.info("IN WATCH  " + this.sessionId + ConsoleColors.CYAN_BOLD() + " " + inWatchPlace() + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
        }
        while (this.opened.get()) {
            try {
                AtomicReference atomicReference = new AtomicReference(null);
                try {
                    try {
                        MongoChangeStreamCursor<ChangeStreamDocument<DataObject>> createCursor = createCursor(this.resumeToken.get());
                        try {
                            atomicReference.set(createCursor);
                            if (this.callingLog.isInfoEnabled()) {
                                this.callingLog.info("NEW CUR   " + this.sessionId + ConsoleColors.GREEN_BOLD() + " cursor.addr=" + System.identityHashCode(createCursor) + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
                            }
                            createCursor.forEachRemaining(changeStreamDocument -> {
                                this.resumeToken.set(changeStreamDocument.getResumeToken());
                                acceptDoc(changeStreamDocument);
                            });
                            if (createCursor != null) {
                                createCursor.close();
                            }
                            MongoChangeStreamCursor mongoChangeStreamCursor = (MongoChangeStreamCursor) atomicReference.get();
                            if (mongoChangeStreamCursor != null && this.callingLog.isInfoEnabled()) {
                                this.callingLog.info("CLO CUR   " + this.sessionId + ConsoleColors.RED_BOLD() + " cursor.addr=" + System.identityHashCode(mongoChangeStreamCursor) + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
                            }
                        } catch (Throwable th) {
                            if (createCursor != null) {
                                try {
                                    createCursor.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                            break;
                        }
                    } catch (Throwable th3) {
                        MongoChangeStreamCursor mongoChangeStreamCursor2 = (MongoChangeStreamCursor) atomicReference.get();
                        if (mongoChangeStreamCursor2 != null && this.callingLog.isInfoEnabled()) {
                            this.callingLog.info("CLO CUR   " + this.sessionId + ConsoleColors.RED_BOLD() + " cursor.addr=" + System.identityHashCode(mongoChangeStreamCursor2) + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
                        }
                        throw th3;
                        break;
                    }
                } catch (MongoException e) {
                    MongoErrorFilters.mongo(e).ignoreStateShouldBeOpened(true).ignoreCursorHasBeenClosed(true).ignoreMongoInterrupted(true).check();
                    MongoChangeStreamCursor mongoChangeStreamCursor3 = (MongoChangeStreamCursor) atomicReference.get();
                    if (mongoChangeStreamCursor3 != null && this.callingLog.isInfoEnabled()) {
                        this.callingLog.info("CLO CUR   " + this.sessionId + ConsoleColors.RED_BOLD() + " cursor.addr=" + System.identityHashCode(mongoChangeStreamCursor3) + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
                    }
                }
            } catch (Exception e2) {
                this.callingLog.error(ConsoleColors.RED_BOLD() + "ERROR    " + ConsoleColors.RESET() + " " + this.sessionId + " " + ConsoleColors.RED() + " Ошибка в смотрителе: " + e2.getClass().getName() + " : " + e2.getMessage() + ConsoleColors.RESET(), e2);
            }
        }
        if (this.callingLog.isInfoEnabled()) {
            this.callingLog.info("NO WATCH  " + this.sessionId + ConsoleColors.CYAN_BOLD() + " " + inWatchPlace() + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
        }
    }

    public void join() {
        try {
            this.watchingThread.join();
        } catch (InterruptedException e) {
        }
    }
}
