package com.doopp.reactor.guice.publisher;

import com.doopp.reactor.guice.RequestAttribute;
import com.doopp.reactor.guice.websocket.WebSocketServerHandle;
import io.netty.channel.Channel;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

/* loaded from: input_file:com/doopp/reactor/guice/publisher/WebsocketPublisher.class */
public class WebsocketPublisher {
    private static final Logger log = LoggerFactory.getLogger(WebsocketPublisher.class);
    private static Flux<Object> rp = UnicastProcessor.create();

    public Mono<Object> sendMessage2(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse, WebSocketServerHandle webSocketServerHandle, Object obj) {
        return Mono.just((RequestAttribute) obj).flatMap(requestAttribute -> {
            return httpServerResponse.header("content-type", "text/plain").sendWebsocket((websocketInbound, websocketOutbound) -> {
                return websocketOutbound.withConnection(connection -> {
                    Channel channel = connection.channel();
                    System.out.println("\n\n >>>" + websocketInbound);
                    System.out.println(channel);
                    connection.onDispose().subscribe((Consumer) null, (Consumer) null, () -> {
                        webSocketServerHandle.onClose(null, channel);
                    });
                    channel.attr(RequestAttribute.REQUEST_ATTRIBUTE).set(requestAttribute);
                    webSocketServerHandle.onConnect(channel);
                    websocketInbound.aggregateFrames().receiveFrames().subscribe(webSocketFrame -> {
                        webSocketServerHandle.handleEvent(webSocketFrame, channel);
                    });
                }).sendObject(rp);
            });
        });
    }

    public Mono<Object> sendMessage(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse, WebSocketServerHandle webSocketServerHandle, Object obj) {
        return httpServerResponse.header("content-type", "text/plain").sendWebsocket((websocketInbound, websocketOutbound) -> {
            return websocketOutbound.withConnection(connection -> {
                Channel channel = connection.channel();
                connection.onDispose().subscribe((Consumer) null, (Consumer) null, () -> {
                    webSocketServerHandle.onClose(null, channel);
                });
                channel.attr(RequestAttribute.REQUEST_ATTRIBUTE).set((RequestAttribute) obj);
                webSocketServerHandle.onConnect(channel);
                websocketInbound.aggregateFrames().receiveFrames().subscribe(webSocketFrame -> {
                    webSocketServerHandle.handleEvent(webSocketFrame, channel);
                });
            }).sendString(UnicastProcessor.create());
        }).map(r2 -> {
            return r2;
        });
    }

    public Mono<Object> sendMessage5(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse, WebSocketServerHandle webSocketServerHandle, Object obj) {
        return httpServerResponse.header("content-type", "text/plain").sendWebsocket((websocketInbound, websocketOutbound) -> {
            return websocketOutbound.sendString(websocketInbound.receive().asString().publishOn(Schedulers.single()).map(str -> {
                return "a";
            }));
        }).then(Mono.empty());
    }
}
