package com.doopp.reactor.guice.websocket;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import java.util.HashMap;
import java.util.Map;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.ReplayProcessor;

/* loaded from: input_file:com/doopp/reactor/guice/websocket/AbstractWebSocketServerHandle.class */
public abstract class AbstractWebSocketServerHandle implements WebSocketServerHandle {
    private Map<String, Channel> channelMap = new HashMap();
    private Map<String, FluxProcessor<String, String>> queueMessageMap = new HashMap();
    private Map<String, Channel[]> channelGroupMap = new HashMap();
    private static AttributeKey<String> CHANNEL_UNIQUE_KEY = AttributeKey.newInstance("channel_unique_key");

    @Override // com.doopp.reactor.guice.websocket.WebSocketServerHandle
    public void connected(Channel channel) {
        connected(channel, channel.id().asShortText());
    }

    @Override // com.doopp.reactor.guice.websocket.WebSocketServerHandle
    public synchronized void connected(Channel channel, String str) {
        disconnect(this.channelMap.get(str));
        channel.attr(CHANNEL_UNIQUE_KEY).set(str);
        this.channelMap.put(str, channel);
        this.queueMessageMap.put(str, ReplayProcessor.create());
    }

    @Override // com.doopp.reactor.guice.websocket.WebSocketServerHandle
    public void sendTextMessage(String str, Channel channel) {
        sendTextMessage(str, (String) channel.attr(CHANNEL_UNIQUE_KEY).get());
    }

    @Override // com.doopp.reactor.guice.websocket.WebSocketServerHandle
    public void sendTextMessage(String str, String str2) {
        Flux.just(str).map((v0) -> {
            return v0.toString();
        }).subscribe(str3 -> {
            this.queueMessageMap.get(str2).onNext(str3);
        });
    }

    @Override // com.doopp.reactor.guice.websocket.WebSocketServerHandle
    public Flux<String> receiveTextMessage(Channel channel) {
        return this.queueMessageMap.get(channel.attr(CHANNEL_UNIQUE_KEY).get());
    }

    @Override // com.doopp.reactor.guice.websocket.WebSocketServerHandle
    public void onTextMessage(TextWebSocketFrame textWebSocketFrame, Channel channel) {
        sendTextMessage(textWebSocketFrame.text(), channel);
    }

    @Override // com.doopp.reactor.guice.websocket.WebSocketServerHandle
    public void onBinaryMessage(BinaryWebSocketFrame binaryWebSocketFrame, Channel channel) {
        channel.writeAndFlush(Unpooled.buffer(0));
    }

    @Override // com.doopp.reactor.guice.websocket.WebSocketServerHandle
    public void onPingMessage(PingWebSocketFrame pingWebSocketFrame, Channel channel) {
        channel.writeAndFlush(new PongWebSocketFrame());
    }

    @Override // com.doopp.reactor.guice.websocket.WebSocketServerHandle
    public void onPongMessage(PongWebSocketFrame pongWebSocketFrame, Channel channel) {
        channel.writeAndFlush(new PingWebSocketFrame());
    }

    @Override // com.doopp.reactor.guice.websocket.WebSocketServerHandle
    public void disconnect(Channel channel) {
        if (channel != null && channel.attr(CHANNEL_UNIQUE_KEY) != null) {
            String str = (String) channel.attr(CHANNEL_UNIQUE_KEY).get();
            this.channelMap.remove(str);
            this.queueMessageMap.remove(str);
        }
        if (channel == null || !channel.isActive()) {
            return;
        }
        channel.disconnect();
        channel.close();
    }

    public Map<String, Channel> getChannelMap() {
        return this.channelMap;
    }
}
