package ch.squaredesk.nova.comm.websockets.server;

import ch.squaredesk.nova.comm.MessageTranscriber;
import ch.squaredesk.nova.comm.websockets.EndpointStreamSource;
import ch.squaredesk.nova.comm.websockets.EndpointStreamSourceFactory;
import ch.squaredesk.nova.comm.websockets.MetricsCollector;
import ch.squaredesk.nova.comm.websockets.SendAction;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.ConcurrentHashMap;
import org.glassfish.grizzly.websockets.WebSocket;
import org.glassfish.grizzly.websockets.WebSocketEngine;

/* loaded from: input_file:ch/squaredesk/nova/comm/websockets/server/ServerEndpointFactory.class */
public class ServerEndpointFactory {
    private static final Scheduler lifecycleEventScheduler = Schedulers.io();
    private final ConcurrentHashMap<WebSocket, ch.squaredesk.nova.comm.websockets.WebSocket> webSockets = new ConcurrentHashMap<>();
    private final MessageTranscriber<String> messageTranscriber;

    public ServerEndpointFactory(MessageTranscriber<String> messageTranscriber) {
        this.messageTranscriber = messageTranscriber;
    }

    private ch.squaredesk.nova.comm.websockets.WebSocket instantiateNewWebSocket(final WebSocket webSocket) {
        SendAction sendAction = new SendAction() { // from class: ch.squaredesk.nova.comm.websockets.server.ServerEndpointFactory.1
            @Override // ch.squaredesk.nova.comm.websockets.SendAction
            public <T> void accept(T t) throws Exception {
                webSocket.send((String) ServerEndpointFactory.this.messageTranscriber.getOutgoingMessageTranscriber(t).apply(t));
            }
        };
        webSocket.getClass();
        return new ch.squaredesk.nova.comm.websockets.WebSocket(sendAction, webSocket::close);
    }

    private ch.squaredesk.nova.comm.websockets.WebSocket createWebSocket(WebSocket webSocket) {
        return this.webSockets.computeIfAbsent(webSocket, this::instantiateNewWebSocket);
    }

    public ServerEndpoint createFor(String str, MetricsCollector metricsCollector) {
        String str2 = str.startsWith("/") ? str : "/" + str;
        String substring = str.startsWith("/") ? str.substring(1) : str;
        StreamCreatingWebSocketApplication streamCreatingWebSocketApplication = new StreamCreatingWebSocketApplication();
        WebSocketEngine.getEngine().register("", str2, streamCreatingWebSocketApplication);
        Function function = this::createWebSocket;
        EndpointStreamSource createStreamSourceFor = EndpointStreamSourceFactory.createStreamSourceFor(substring, function, streamCreatingWebSocketApplication, metricsCollector);
        Flowable subscribeOn = streamCreatingWebSocketApplication.connectingSockets().subscribeOn(lifecycleEventScheduler);
        function.getClass();
        Disposable subscribe = subscribeOn.subscribe((v1) -> {
            r1.apply(v1);
        });
        Disposable subscribe2 = streamCreatingWebSocketApplication.closingSockets().subscribeOn(lifecycleEventScheduler).subscribe(pair -> {
            this.webSockets.remove(pair._1);
        });
        return new ServerEndpoint(str, createStreamSourceFor, new SendAction() { // from class: ch.squaredesk.nova.comm.websockets.server.ServerEndpointFactory.2
            @Override // ch.squaredesk.nova.comm.websockets.SendAction
            public <T> void accept(T t) throws Exception {
                String str3 = (String) ServerEndpointFactory.this.messageTranscriber.getOutgoingMessageTranscriber(t).apply(t);
                ConcurrentHashMap.KeySetView keySet = ServerEndpointFactory.this.webSockets.keySet();
                keySet.stream().filter(webSocket -> {
                    try {
                        webSocket.broadcast(keySet, str3);
                        return true;
                    } catch (Exception e) {
                        return false;
                    }
                }).findAny();
            }
        }, closeReason -> {
            subscribe.dispose();
            subscribe2.dispose();
            this.webSockets.keySet().forEach(webSocket -> {
                webSocket.close(closeReason.code, closeReason.text);
            });
            this.webSockets.clear();
            streamCreatingWebSocketApplication.close();
        }, this.messageTranscriber, metricsCollector);
    }
}
