package io.scalecube.gateway.clientsdk.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.util.function.Consumer;
import java.util.logging.Level;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.ipc.netty.http.websocket.WebsocketInbound;
import reactor.ipc.netty.http.websocket.WebsocketOutbound;

/* loaded from: input_file:io/scalecube/gateway/clientsdk/websocket/WebsocketSession.class */
final class WebsocketSession {
    private final WebsocketInbound inbound;
    private final WebsocketOutbound outbound;
    private final DirectProcessor<ByteBuf> inboundProcessor = DirectProcessor.create();
    private final FluxSink<ByteBuf> inboundSink = this.inboundProcessor.serialize().sink();

    /* JADX INFO: Access modifiers changed from: package-private */
    public WebsocketSession(WebsocketInbound websocketInbound, WebsocketOutbound websocketOutbound) {
        this.inbound = websocketInbound;
        this.outbound = websocketOutbound;
        Flux log = this.inbound.aggregateFrames().receive().log(">>> RECEIVE", Level.FINE, new SignalType[0]);
        FluxSink<ByteBuf> fluxSink = this.inboundSink;
        fluxSink.getClass();
        Consumer consumer = (v1) -> {
            r1.next(v1);
        };
        FluxSink<ByteBuf> fluxSink2 = this.inboundSink;
        fluxSink2.getClass();
        Consumer consumer2 = fluxSink2::error;
        FluxSink<ByteBuf> fluxSink3 = this.inboundSink;
        fluxSink3.getClass();
        log.subscribe(consumer, consumer2, fluxSink3::complete);
    }

    public Mono<Void> send(ByteBuf byteBuf) {
        return this.outbound.options((v0) -> {
            v0.flushOnEach();
        }).sendObject(Mono.just(new TextWebSocketFrame(byteBuf)).log("<<< SEND", Level.FINE, new SignalType[0])).then();
    }

    public Flux<ByteBuf> receive() {
        return this.inboundProcessor.map((v0) -> {
            return v0.retain();
        });
    }

    public Mono<Void> close() {
        return Mono.defer(() -> {
            return this.outbound.sendObject(new CloseWebSocketFrame(1000, "close")).then().log("<<< CLOSE", Level.FINE, new SignalType[0]);
        });
    }

    public Mono<Void> onClose(Runnable runnable) {
        return this.inbound.context().onClose(runnable).onClose();
    }
}
