package ch.squaredesk.nova.comm.websockets.server;

import ch.squaredesk.nova.comm.websockets.CloseReason;
import ch.squaredesk.nova.comm.websockets.StreamCreatingEndpointWrapper;
import ch.squaredesk.nova.tuples.Pair;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import java.util.function.Function;
import org.glassfish.grizzly.websockets.ClosingFrame;
import org.glassfish.grizzly.websockets.DataFrame;
import org.glassfish.grizzly.websockets.WebSocket;
import org.glassfish.grizzly.websockets.WebSocketApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/squaredesk/nova/comm/websockets/server/StreamCreatingWebSocketApplication.class */
public class StreamCreatingWebSocketApplication<MessageType> extends WebSocketApplication implements StreamCreatingEndpointWrapper<WebSocket, MessageType> {
    private static final Logger logger = LoggerFactory.getLogger(StreamCreatingWebSocketApplication.class);
    private final Subject<Pair<WebSocket, MessageType>> messages = PublishSubject.create().toSerialized();
    private final Subject<WebSocket> connectedSockets = PublishSubject.create();
    private final Subject<Pair<WebSocket, CloseReason>> closedSockets = PublishSubject.create();
    private final Subject<Pair<WebSocket, Throwable>> errors = PublishSubject.create();
    private final Function<String, MessageType> messageUnmarshaller;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamCreatingWebSocketApplication(Function<String, MessageType> function) {
        this.messageUnmarshaller = function;
    }

    public void onClose(WebSocket webSocket, DataFrame dataFrame) {
        CloseReason closeReason;
        ClosingFrame closingFrame = (ClosingFrame) dataFrame;
        try {
            closeReason = CloseReason.forCloseCode(closingFrame.getCode());
        } catch (Exception e) {
            logger.error("Unexpected close code " + closingFrame.getCode() + " in closing dataFrame " + dataFrame);
            closeReason = CloseReason.UNEXPECTED_CONDITION;
        }
        this.closedSockets.onNext(new Pair(webSocket, closeReason));
    }

    public void onConnect(WebSocket webSocket) {
        this.connectedSockets.onNext(webSocket);
    }

    protected boolean onError(WebSocket webSocket, Throwable th) {
        this.errors.onNext(new Pair(webSocket, th));
        return true;
    }

    public void onMessage(WebSocket webSocket, String str) {
        try {
            this.messages.onNext(new Pair(webSocket, this.messageUnmarshaller.apply(str)));
        } catch (Exception e) {
            logger.info("", e);
        }
    }

    @Override // ch.squaredesk.nova.comm.websockets.StreamCreatingEndpointWrapper
    public Flowable<Pair<WebSocket, MessageType>> messages() {
        return this.messages.toFlowable(BackpressureStrategy.BUFFER);
    }

    @Override // ch.squaredesk.nova.comm.websockets.StreamCreatingEndpointWrapper
    public Flowable<WebSocket> connectingSockets() {
        return this.connectedSockets.toFlowable(BackpressureStrategy.BUFFER);
    }

    @Override // ch.squaredesk.nova.comm.websockets.StreamCreatingEndpointWrapper
    public Flowable<Pair<WebSocket, CloseReason>> closingSockets() {
        return this.closedSockets.toFlowable(BackpressureStrategy.BUFFER);
    }

    @Override // ch.squaredesk.nova.comm.websockets.StreamCreatingEndpointWrapper
    public Flowable<Pair<WebSocket, Throwable>> errors() {
        return this.errors.toFlowable(BackpressureStrategy.BUFFER);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.messages.onComplete();
        this.connectedSockets.onComplete();
        this.closedSockets.onComplete();
        this.errors.onComplete();
    }
}
