package io.scalecube.gateway.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.scalecube.gateway.core.GatewayMessage;
import io.scalecube.gateway.core.GatewayMessageCodec;
import io.scalecube.gateway.core.Signal;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.BadRequestException;
import io.scalecube.services.exceptions.ExceptionProcessor;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/gateway/websocket/WebSocketAcceptor.class */
public final class WebSocketAcceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketAcceptor.class);
    private final ServiceCall serviceCall;
    private final GatewayMessageCodec gatewayMessageCodec = new GatewayMessageCodec();

    public WebSocketAcceptor(ServiceCall serviceCall) {
        this.serviceCall = serviceCall;
    }

    public Mono<Void> onConnect(WebSocketSession webSocketSession) {
        LOGGER.info("Session connected: " + webSocketSession);
        Mono<Void> send = webSocketSession.send(webSocketSession.receive().flatMap(webSocketFrame -> {
            try {
                GatewayMessage message = toMessage(webSocketFrame);
                Long streamId = message.streamId();
                if (message.qualifier() == null) {
                    throw new BadRequestException("q is missing");
                }
                if (streamId == null) {
                    throw new BadRequestException("sid is missing");
                }
                ServiceMessage serviceMessage = GatewayMessage.toServiceMessage(message);
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                Flux requestMany = this.serviceCall.requestMany(serviceMessage);
                if (message.inactivity() != null) {
                    requestMany = requestMany.timeout(Duration.ofSeconds(message.inactivity().intValue()));
                }
                return requestMany.map(serviceMessage2 -> {
                    GatewayMessage.Builder streamId2 = GatewayMessage.from(serviceMessage2).streamId(streamId);
                    if (ExceptionProcessor.isError(serviceMessage2)) {
                        atomicBoolean.set(true);
                        streamId2.signal(Signal.ERROR);
                    }
                    return streamId2.build();
                }).concatWith(Flux.defer(() -> {
                    return atomicBoolean.get() ? Flux.empty() : Flux.just(GatewayMessage.builder().streamId(streamId).signal(Signal.COMPLETE).build());
                })).onErrorResume(th -> {
                    return Flux.just(GatewayMessage.builder().streamId(streamId).signal(Signal.ERROR).build());
                });
            } catch (Throwable th2) {
                return Flux.just(GatewayMessage.from(ExceptionProcessor.toMessage(th2)).streamId((Long) null).signal(Signal.ERROR).build());
            }
        }).map(this::toByteBuf).doOnError(th -> {
            webSocketSession.close();
        }));
        webSocketSession.onClose(() -> {
            LOGGER.info("Session disconnected: bye bye");
        });
        return send.then();
    }

    public Mono<Void> onDisconnect(WebSocketSession webSocketSession) {
        LOGGER.info("Session disconnected: " + webSocketSession);
        return Mono.empty();
    }

    private ByteBuf toByteBuf(GatewayMessage gatewayMessage) {
        try {
            return this.gatewayMessageCodec.encode(gatewayMessage);
        } catch (Throwable th) {
            LOGGER.error("Failed to encode message: {}, cause: {}", gatewayMessage, th);
            throw new BadRequestException("Failed to encode message q=" + gatewayMessage.qualifier());
        }
    }

    private GatewayMessage toMessage(WebSocketFrame webSocketFrame) {
        ByteBuf slice = webSocketFrame.content().slice();
        try {
            return this.gatewayMessageCodec.decode(slice);
        } catch (Throwable th) {
            LOGGER.error("Failed to decode message headers: {}, cause: {}", slice.toString(Charset.defaultCharset()), th);
            throw new BadRequestException("Failed to decode message headers {headers=" + slice.readableBytes() + ", data=" + slice.readableBytes() + "}");
        }
    }
}
