package com.doopp.reactor.guice.publisher;

import com.doopp.reactor.guice.RequestAttribute;
import com.doopp.reactor.guice.websocket.WebSocketServerHandle;
import io.netty.channel.Channel;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

/* loaded from: input_file:com/doopp/reactor/guice/publisher/WebsocketPublisher.class */
public class WebsocketPublisher {
    private static Flux<Object> rp = UnicastProcessor.create();

    public Mono<Object> sendMessage(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse, WebSocketServerHandle webSocketServerHandle, Object obj) {
        return Mono.just((RequestAttribute) obj).flatMap(requestAttribute -> {
            return httpServerResponse.header("content-type", "text/plain").sendWebsocket((websocketInbound, websocketOutbound) -> {
                return websocketOutbound.withConnection(connection -> {
                    Channel channel = connection.channel();
                    connection.onDispose().subscribe((Consumer) null, (Consumer) null, () -> {
                        webSocketServerHandle.onClose(null, channel);
                    });
                    channel.attr(RequestAttribute.REQUEST_ATTRIBUTE).set(requestAttribute);
                    webSocketServerHandle.onConnect(channel);
                    websocketInbound.aggregateFrames().receiveFrames().subscribe(webSocketFrame -> {
                        webSocketServerHandle.handleEvent(webSocketFrame, channel);
                    });
                }).options((v0) -> {
                    v0.flushOnEach();
                }).sendObject(rp);
            });
        });
    }
}
