package org.restheart.mongodb.handlers.changestreams;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.codecs.Codec;
import org.bson.codecs.DocumentCodec;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.restheart.mongodb.db.MongoReactiveClientSingleton;
import org.restheart.utils.BsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/restheart/mongodb/handlers/changestreams/ChangeStreamSubscriber.class */
public class ChangeStreamSubscriber implements Subscriber<ChangeStreamDocument<?>> {
    private final SessionKey sessionKey;
    private List<BsonDocument> resolvedStages;
    private String dbName;
    private String collName;
    private boolean init;
    private Subscription sub;
    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeStreamSubscriber.class);
    private static final CodecRegistry REGISTRY = CodecRegistries.fromCodecs(new Codec[]{new DocumentCodec()});

    public ChangeStreamSubscriber(SessionKey sessionKey, List<BsonDocument> list, String str, String str2) {
        this.sessionKey = sessionKey;
        this.resolvedStages = list;
        this.dbName = str;
        this.collName = str2;
        this.init = false;
    }

    public ChangeStreamSubscriber(SessionKey sessionKey, List<BsonDocument> list, String str, String str2, boolean z) {
        this.sessionKey = sessionKey;
        this.resolvedStages = list;
        this.dbName = str;
        this.collName = str2;
        this.init = z;
    }

    public void onSubscribe(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
        this.sub = subscription;
    }

    public void onNext(ChangeStreamDocument<?> changeStreamDocument) {
        if (!this.init) {
            setInit(true);
        }
        if (WebSocketSessionsRegistry.getInstance().get(this.sessionKey).isEmpty()) {
            stop();
            LOGGER.debug("Closing unwatched stream, sessionKey=" + this.sessionKey);
            ChangeStreamsRegistry.getInstance().remove(this.sessionKey);
        } else {
            LOGGER.trace("[clients watching]: " + WebSocketSessionsRegistry.getInstance().get(this.sessionKey).size());
            LOGGER.trace("Change stream notification for sessionKey={}: {}", this.sessionKey, changeStreamDocument);
            ChangeStreamWebsocketCallback.NOTIFICATION_PUBLISHER.submit(new ChangeStreamNotification(this.sessionKey, BsonUtils.toJson(getDocument(changeStreamDocument), this.sessionKey.getJsonMode())));
        }
    }

    public void onError(Throwable th) {
        LOGGER.warn("Error from stream: " + th.getMessage());
        if (this.init) {
            LOGGER.warn("Restarting stream: {}/{}", this.dbName, this.collName);
            restartStream();
        } else {
            LOGGER.warn("Closing all connected ws clients: {}/{}", this.dbName, this.collName);
            closeAllOnError(this.dbName, this.collName);
        }
    }

    private void closeAllOnError(String str, String str2) {
        WebSocketSessionsRegistry webSocketSessionsRegistry = WebSocketSessionsRegistry.getInstance();
        ChangeStreamsRegistry changeStreamsRegistry = ChangeStreamsRegistry.getInstance();
        ((Set) changeStreamsRegistry.getSessionKeysOnCollection(str, str2).stream().collect(Collectors.toSet())).forEach(sessionKey -> {
            ((Set) webSocketSessionsRegistry.get(sessionKey).stream().collect(Collectors.toSet())).forEach(changeStreamWebSocketSession -> {
                try {
                    changeStreamWebSocketSession.close();
                    webSocketSessionsRegistry.remove(sessionKey, changeStreamWebSocketSession);
                } catch (IOException e) {
                }
            });
            changeStreamsRegistry.remove(sessionKey);
        });
    }

    private void setInit(boolean z) {
        this.init = z;
    }

    private void restartStream() {
        try {
            MongoReactiveClientSingleton.getInstance().getClient().getDatabase(this.dbName).getCollection(this.collName).watch(this.resolvedStages).fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(new ChangeStreamSubscriber(this.sessionKey, this.resolvedStages, this.dbName, this.collName, true));
        } catch (Throwable th) {
            LOGGER.warn("Error trying to restart the stream: " + th.getMessage());
        }
    }

    public void onComplete() {
        LOGGER.debug("Stream completed, sessionKey=" + this.sessionKey);
    }

    public void stop() {
        this.sub.cancel();
    }

    private BsonDocument getDocument(ChangeStreamDocument<?> changeStreamDocument) {
        BsonDocument bsonDocument = new BsonDocument();
        if (changeStreamDocument == null) {
            return bsonDocument;
        }
        if (changeStreamDocument.getFullDocument() != null) {
            try {
                bsonDocument.put("fullDocument", toBson((Document) changeStreamDocument.getFullDocument()));
            } catch (ClassCastException e) {
                LOGGER.warn("change stream fullDocument is not json {}", changeStreamDocument.getFullDocument());
                bsonDocument.put("fullDocument", BsonNull.VALUE);
            }
        }
        if (changeStreamDocument.getDocumentKey() != null) {
            bsonDocument.put("documentKey", changeStreamDocument.getDocumentKey());
        }
        if (changeStreamDocument.getUpdateDescription() != null) {
            BsonDocument bsonDocument2 = new BsonDocument();
            BsonDocument updatedFields = changeStreamDocument.getUpdateDescription().getUpdatedFields();
            if (updatedFields != null) {
                bsonDocument2.put("updatedFields", updatedFields);
            } else {
                bsonDocument2.put("updatedFields", BsonNull.VALUE);
            }
            List removedFields = changeStreamDocument.getUpdateDescription().getRemovedFields();
            if (removedFields == null) {
                bsonDocument2.put("updatedFields", new BsonArray());
            } else {
                BsonArray bsonArray = new BsonArray();
                removedFields.forEach(str -> {
                    bsonArray.add(new BsonString(str));
                });
                bsonDocument2.put("removedFields", bsonArray);
            }
            bsonDocument.put("updateDescription", bsonDocument2);
        }
        if (changeStreamDocument.getOperationType() != null) {
            bsonDocument.put("operationType", new BsonString(changeStreamDocument.getOperationType().getValue()));
        }
        return bsonDocument;
    }

    private static BsonValue toBson(Document document) {
        return document == null ? BsonNull.VALUE : document.toBsonDocument(BsonDocument.class, REGISTRY);
    }
}
