package ch.squaredesk.nova.comm.websockets.server;

import ch.squaredesk.nova.comm.retrieving.MessageUnmarshaller;
import ch.squaredesk.nova.comm.sending.MessageMarshaller;
import ch.squaredesk.nova.comm.websockets.EndpointStreamSource;
import ch.squaredesk.nova.comm.websockets.EndpointStreamSourceFactory;
import ch.squaredesk.nova.comm.websockets.MetricsCollector;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import org.glassfish.grizzly.websockets.WebSocket;
import org.glassfish.grizzly.websockets.WebSocketEngine;

/* loaded from: input_file:ch/squaredesk/nova/comm/websockets/server/ServerEndpointFactory.class */
public class ServerEndpointFactory {
    private static final Scheduler lifecycleEventScheduler = Schedulers.io();
    private final ConcurrentHashMap<WebSocket, ch.squaredesk.nova.comm.websockets.WebSocket<?>> webSockets = new ConcurrentHashMap<>();

    private <MessageType> ch.squaredesk.nova.comm.websockets.WebSocket<MessageType> instantiateNewWebSocket(WebSocket webSocket, MessageMarshaller<MessageType, String> messageMarshaller) {
        Consumer consumer = obj -> {
            webSocket.send(marshal(obj, messageMarshaller));
        };
        Objects.requireNonNull(webSocket);
        return new ch.squaredesk.nova.comm.websockets.WebSocket<>(consumer, webSocket::close);
    }

    private <MessageType> ch.squaredesk.nova.comm.websockets.WebSocket<MessageType> createWebSocket(WebSocket webSocket, MessageMarshaller<MessageType, String> messageMarshaller) {
        return (ch.squaredesk.nova.comm.websockets.WebSocket) this.webSockets.computeIfAbsent(webSocket, webSocket2 -> {
            return instantiateNewWebSocket(webSocket2, messageMarshaller);
        });
    }

    private <MessageType> String marshal(MessageType messagetype, MessageMarshaller<MessageType, String> messageMarshaller) {
        try {
            return (String) messageMarshaller.marshal(messagetype);
        } catch (Exception e) {
            throw new RuntimeException("Unable to marshal message " + messagetype, e);
        }
    }

    private <MessageType> MessageType unmarshal(String str, String str2, MessageUnmarshaller<String, MessageType> messageUnmarshaller, MetricsCollector metricsCollector) {
        try {
            return (MessageType) messageUnmarshaller.unmarshal(str2);
        } catch (Exception e) {
            if (metricsCollector != null) {
                metricsCollector.unparsableMessageReceived(str);
            }
            throw new RuntimeException("Unable to unmarshal incoming message " + str2 + " on destination " + str, e);
        }
    }

    public <MessageType> ServerEndpoint<MessageType> createFor(String str, MessageMarshaller<MessageType, String> messageMarshaller, MessageUnmarshaller<String, MessageType> messageUnmarshaller, MetricsCollector metricsCollector) {
        String str2 = str.startsWith("/") ? str : "/" + str;
        String substring = str.startsWith("/") ? str.substring(1) : str;
        StreamCreatingWebSocketApplication streamCreatingWebSocketApplication = new StreamCreatingWebSocketApplication(str3 -> {
            return unmarshal(substring, str3, messageUnmarshaller, metricsCollector);
        });
        WebSocketEngine.getEngine().register("", str2, streamCreatingWebSocketApplication);
        Function function = webSocket -> {
            return createWebSocket(webSocket, messageMarshaller);
        };
        EndpointStreamSource createStreamSourceFor = EndpointStreamSourceFactory.createStreamSourceFor(substring, function, streamCreatingWebSocketApplication, metricsCollector);
        Flowable subscribeOn = streamCreatingWebSocketApplication.connectingSockets().subscribeOn(lifecycleEventScheduler);
        Objects.requireNonNull(function);
        Disposable subscribe = subscribeOn.subscribe((v1) -> {
            r1.apply(v1);
        });
        Disposable subscribe2 = streamCreatingWebSocketApplication.closingSockets().subscribeOn(lifecycleEventScheduler).subscribe(pair -> {
            this.webSockets.remove(pair._1);
        });
        return new ServerEndpoint<>(createStreamSourceFor, obj -> {
            String marshal = marshal(obj, messageMarshaller);
            ConcurrentHashMap.KeySetView keySet = this.webSockets.keySet();
            keySet.stream().filter(webSocket2 -> {
                try {
                    webSocket2.broadcast(keySet, marshal);
                    return true;
                } catch (Exception e) {
                    return false;
                }
            }).findAny();
        }, closeReason -> {
            subscribe.dispose();
            subscribe2.dispose();
            this.webSockets.keySet().forEach(webSocket2 -> {
                webSocket2.close(closeReason.code, closeReason.text);
            });
            this.webSockets.clear();
            streamCreatingWebSocketApplication.close();
        });
    }
}
