package org.restheart.mongodb.handlers.changestreams;

import io.undertow.websockets.core.WebSocketCallback;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/restheart/mongodb/handlers/changestreams/WebSocketNotificationSubscriber.class */
public class WebSocketNotificationSubscriber implements Flow.Subscriber<ChangeStreamNotification> {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketNotificationSubscriber.class);
    private Flow.Subscription sub;

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
        this.sub = subscription;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(ChangeStreamNotification changeStreamNotification) {
        Set<ChangeStreamWebSocketSession> set = GuavaHashMultimapSingleton.get(changeStreamNotification.getSessionKey());
        Set newSetFromMap = Collections.newSetFromMap(new ConcurrentHashMap());
        set.stream().forEach(changeStreamWebSocketSession -> {
            sendNotification(changeStreamWebSocketSession, changeStreamNotification.getNotificationMessage(), newSetFromMap);
        });
        newSetFromMap.parallelStream().forEach(changeStreamWebSocketSession2 -> {
            GuavaHashMultimapSingleton.remove(changeStreamNotification.getSessionKey(), changeStreamWebSocketSession2);
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        LOGGER.warn("Error sending stream notification: " + th.getMessage());
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        LOGGER.trace("Notification subscription completed");
    }

    public void stop() {
        this.sub.cancel();
    }

    private synchronized void sendNotification(final ChangeStreamWebSocketSession changeStreamWebSocketSession, String str, final Set<ChangeStreamWebSocketSession> set) {
        WebSockets.sendText(str, changeStreamWebSocketSession.getChannel(), new WebSocketCallback<Void>() { // from class: org.restheart.mongodb.handlers.changestreams.WebSocketNotificationSubscriber.1
            public void complete(WebSocketChannel webSocketChannel, Void r3) {
            }

            public void onError(WebSocketChannel webSocketChannel, Void r5, Throwable th) {
                set.add(changeStreamWebSocketSession);
            }
        });
    }
}
