/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.boot.rsocket;

import io.netty.buffer.ByteBuf;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import java.nio.ByteBuffer;
import org.noear.solon.boot.rsocket._SocketSession;
import org.noear.solon.core.event.EventBus;
import org.noear.solon.core.message.Message;
import org.noear.solon.core.message.Session;
import org.noear.solon.extend.socketd.ListenerProxy;
import org.noear.solon.extend.socketd.ProtocolManager;
import reactor.core.publisher.Mono;

public class RsAcceptor
implements SocketAcceptor,
RSocket {
    public static final RsAcceptor instance = new RsAcceptor();

    public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
        Session session = _SocketSession.get(rSocket);
        ListenerProxy.getGlobal().onOpen(session);
        return Mono.just((Object)this);
    }

    public Mono<Void> fireAndForget(Payload payload) {
        ByteBuf byteBuf = payload.data();
        int len = byteBuf.readInt();
        if (len > 0) {
            byte[] bytes = new byte[len - 4];
            byteBuf.readBytes(bytes);
            ByteBuffer byteBuffer = ByteBuffer.allocate(len);
            byteBuffer.putInt(len);
            byteBuffer.put(bytes);
            byteBuffer.flip();
            Message message = ProtocolManager.decode((ByteBuffer)byteBuffer);
            Session session = _SocketSession.get(this);
            try {
                ListenerProxy.getGlobal().onMessage(session, message);
            }
            catch (Throwable ex) {
                EventBus.push((Object)ex);
            }
        }
        return Mono.empty();
    }

    public Mono<Void> onClose() {
        _SocketSession.remove(this);
        return Mono.empty();
    }
}

