package com.doopp.reactor.guice.publisher;

import com.doopp.reactor.guice.RequestAttribute;
import com.doopp.reactor.guice.websocket.WebSocketServerHandle;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

/* loaded from: input_file:com/doopp/reactor/guice/publisher/WebsocketPublisher.class */
public class WebsocketPublisher {
    private static final String CURRENT_CHANNEL = "current_channel";

    public Mono<Object> sendMessage(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse, WebSocketServerHandle webSocketServerHandle, Object obj) {
        return Mono.just((RequestAttribute) obj).flatMap(requestAttribute -> {
            return httpServerResponse.header("content-type", "text/plain").sendWebsocket((websocketInbound, websocketOutbound) -> {
                return websocketPublisher(websocketInbound, websocketOutbound, webSocketServerHandle, requestAttribute);
            });
        });
    }

    private Publisher<Void> websocketPublisher(WebsocketInbound websocketInbound, WebsocketOutbound websocketOutbound, WebSocketServerHandle webSocketServerHandle, RequestAttribute requestAttribute) {
        return websocketOutbound.withConnection(connection -> {
            onConnected(websocketInbound, connection, webSocketServerHandle, requestAttribute);
        }).options((v0) -> {
            v0.flushOnEach();
        }).sendString(webSocketServerHandle.receiveTextMessage((Channel) requestAttribute.getAttribute(CURRENT_CHANNEL, Channel.class)));
    }

    private void onConnected(WebsocketInbound websocketInbound, Connection connection, WebSocketServerHandle webSocketServerHandle, RequestAttribute requestAttribute) {
        Channel channel = connection.channel();
        connection.onDispose().subscribe((Consumer) null, (Consumer) null, () -> {
            webSocketServerHandle.disconnect(channel);
        });
        channel.attr(RequestAttribute.REQUEST_ATTRIBUTE).set(requestAttribute);
        requestAttribute.setAttribute(CURRENT_CHANNEL, channel);
        webSocketServerHandle.connected(channel);
        websocketInbound.aggregateFrames().receiveFrames().flatMap(webSocketFrame -> {
            if (webSocketFrame instanceof TextWebSocketFrame) {
                webSocketServerHandle.onTextMessage((TextWebSocketFrame) webSocketFrame, channel);
            } else if (webSocketFrame instanceof BinaryWebSocketFrame) {
                webSocketServerHandle.onBinaryMessage((BinaryWebSocketFrame) webSocketFrame, channel);
            } else if (webSocketFrame instanceof PingWebSocketFrame) {
                webSocketServerHandle.onPingMessage((PingWebSocketFrame) webSocketFrame, channel);
            } else if (webSocketFrame instanceof PongWebSocketFrame) {
                webSocketServerHandle.onPongMessage((PongWebSocketFrame) webSocketFrame, channel);
            } else if (webSocketFrame instanceof CloseWebSocketFrame) {
                connection.dispose();
                webSocketServerHandle.disconnect(channel);
            }
            return Mono.empty();
        }).onErrorResume(th -> {
            webSocketServerHandle.disconnect(channel);
            return Mono.error(th);
        }).subscribe();
    }
}
