package org.restheart.mongodb.handlers.changestreams;

import com.mongodb.MongoException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import io.undertow.websockets.core.WebSocketCallback;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.Document;
import org.restheart.mongodb.RHMongoClients;
import org.restheart.utils.BsonUtils;
import org.restheart.utils.LambdaUtils;
import org.restheart.utils.ThreadsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/restheart/mongodb/handlers/changestreams/ChangeStreamWorker.class */
public class ChangeStreamWorker implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeStreamWorker.class);
    private final ChangeStreamWorkerKey key;
    private final List<BsonDocument> resolvedStages;
    private final String dbName;
    private final String collName;
    private final Set<WebSocketSession> websocketSessions = Collections.synchronizedSet(new HashSet());
    private Thread handlingVirtualThread = null;

    /* loaded from: input_file:org/restheart/mongodb/handlers/changestreams/ChangeStreamWorker$NoMoreWebSocketException.class */
    private static class NoMoreWebSocketException extends Exception {
        private NoMoreWebSocketException() {
        }
    }

    public ChangeStreamWorker(ChangeStreamWorkerKey changeStreamWorkerKey, List<BsonDocument> list, String str, String str2) {
        this.key = changeStreamWorkerKey;
        this.resolvedStages = list;
        this.dbName = str;
        this.collName = str2;
    }

    public ChangeStreamWorkerKey getKey() {
        return this.key;
    }

    public String getDbName() {
        return this.dbName;
    }

    public String getCollName() {
        return this.collName;
    }

    public Thread handlingVirtualThread() {
        return this.handlingVirtualThread;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (Thread.currentThread().isVirtual()) {
            this.handlingVirtualThread = Thread.currentThread();
        }
        LOGGER.debug("Change stream worker started {}", Thread.currentThread().getName());
        try {
            try {
                changeStreamEventsLoop();
                ChangeStreamWorkers.getInstance().remove(this.key);
                LOGGER.debug("Change stream worker ended");
            } catch (Throwable th) {
                if (th instanceof NoMoreWebSocketException) {
                    LOGGER.debug("Closing Change Stream Worker {} since it has no active WebSocket sessions", this.key);
                } else {
                    LOGGER.error("Change Stream Worker {} died due to exception", this.key, th);
                }
                closeAllWebSocketSessions();
                ChangeStreamWorkers.getInstance().remove(this.key);
                LOGGER.debug("Change stream worker ended");
            }
        } catch (Throwable th2) {
            ChangeStreamWorkers.getInstance().remove(this.key);
            LOGGER.debug("Change stream worker ended");
            throw th2;
        }
    }

    private void changeStreamEventsLoop() {
        try {
            _changeStreamEventsLoop();
        } catch (MongoException e) {
            LOGGER.error("MongoDb error on ChangeStreamWorker {}, restarting a new worker", this.key, e);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e2) {
            } finally {
                changeStreamEventsLoop();
            }
        } catch (MongoInterruptedException e3) {
            close();
        }
    }

    private void _changeStreamEventsLoop() {
        LOGGER.debug("Change Stream Worker {} started listening for change events", this.key);
        startChangeStream().forEach(changeStreamDocument -> {
            if (this.websocketSessions.isEmpty()) {
                LambdaUtils.throwsSneakyException(new NoMoreWebSocketException());
            }
            String json = BsonUtils.toJson(getDocument(changeStreamDocument), this.key.getJsonMode());
            this.websocketSessions.stream().forEach(webSocketSession -> {
                ThreadsUtils.virtualThreadsExecutor().execute(() -> {
                    try {
                        send(webSocketSession, json);
                        LOGGER.trace("Change event sent to WebSocket session {}", webSocketSession.getId());
                    } catch (Throwable th) {
                        LOGGER.error("Error sending change event to WebSocket session ", webSocketSession.getId(), th);
                    }
                });
            });
        });
    }

    public Set<WebSocketSession> websocketSessions() {
        return this.websocketSessions;
    }

    private void send(final WebSocketSession webSocketSession, String str) {
        WebSockets.sendText(str, webSocketSession.getChannel(), new WebSocketCallback<Void>() { // from class: org.restheart.mongodb.handlers.changestreams.ChangeStreamWorker.1
            public void complete(WebSocketChannel webSocketChannel, Void r3) {
            }

            public void onError(WebSocketChannel webSocketChannel, Void r7, Throwable th) {
                try {
                    webSocketSession.close();
                    String id = webSocketSession.getId();
                    ChangeStreamWorker.this.websocketSessions().removeIf(webSocketSession2 -> {
                        return webSocketSession2.getId().equals(id);
                    });
                } catch (IOException e) {
                    ChangeStreamWorker.LOGGER.warn("Error closing WebSocket session {}", webSocketSession.getId(), e);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        ChangeStreamWorkers.getInstance().remove(this.key);
        closeAllWebSocketSessions();
        if (this.handlingVirtualThread == null || this.handlingVirtualThread.isInterrupted()) {
            return;
        }
        this.handlingVirtualThread.interrupt();
    }

    void closeAllWebSocketSessions() {
        ((Set) this.websocketSessions.stream().collect(Collectors.toSet())).forEach(webSocketSession -> {
            try {
                webSocketSession.close();
                this.websocketSessions.remove(webSocketSession);
            } catch (IOException e) {
                LOGGER.warn("Error closing WebSocket session {}", webSocketSession, e);
            }
        });
    }

    private ChangeStreamIterable<Document> startChangeStream() {
        try {
            return RHMongoClients.mclient().getDatabase(this.dbName).getCollection(this.collName).watch(this.resolvedStages).fullDocument(FullDocument.UPDATE_LOOKUP);
        } catch (Throwable th) {
            LOGGER.warn("Error trying to start the stream: " + th.getMessage());
            throw th;
        }
    }

    private BsonDocument getDocument(ChangeStreamDocument<?> changeStreamDocument) {
        BsonDocument bsonDocument = new BsonDocument();
        if (changeStreamDocument == null) {
            return bsonDocument;
        }
        if (changeStreamDocument.getFullDocument() != null) {
            try {
                bsonDocument.put("fullDocument", BsonUtils.documentToBson((Document) changeStreamDocument.getFullDocument()));
            } catch (ClassCastException e) {
                LOGGER.warn("change event fullDocument is not json {}", changeStreamDocument.getFullDocument());
                bsonDocument.put("fullDocument", BsonNull.VALUE);
            }
        }
        if (changeStreamDocument.getDocumentKey() != null) {
            bsonDocument.put("documentKey", changeStreamDocument.getDocumentKey());
        }
        if (changeStreamDocument.getUpdateDescription() != null) {
            BsonDocument bsonDocument2 = new BsonDocument();
            BsonDocument updatedFields = changeStreamDocument.getUpdateDescription().getUpdatedFields();
            if (updatedFields != null) {
                bsonDocument2.put("updatedFields", updatedFields);
            } else {
                bsonDocument2.put("updatedFields", BsonNull.VALUE);
            }
            List removedFields = changeStreamDocument.getUpdateDescription().getRemovedFields();
            if (removedFields == null) {
                bsonDocument2.put("updatedFields", new BsonArray());
            } else {
                BsonArray bsonArray = new BsonArray();
                removedFields.forEach(str -> {
                    bsonArray.add(new BsonString(str));
                });
                bsonDocument2.put("removedFields", bsonArray);
            }
            bsonDocument.put("updateDescription", bsonDocument2);
        }
        if (changeStreamDocument.getOperationType() != null) {
            bsonDocument.put("operationType", new BsonString(changeStreamDocument.getOperationType().getValue()));
        }
        return bsonDocument;
    }
}
