package io.scalecube.gateway.websocket;

import io.netty.buffer.ByteBuf;
import io.scalecube.gateway.GatewayMetrics;
import io.scalecube.gateway.ReferenceCountUtil;
import io.scalecube.gateway.websocket.message.GatewayMessage;
import io.scalecube.gateway.websocket.message.GatewayMessageCodec;
import io.scalecube.gateway.websocket.message.Signal;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.BadRequestException;
import io.scalecube.services.exceptions.ExceptionProcessor;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.server.HttpServerRequest;
import reactor.ipc.netty.http.server.HttpServerResponse;

/* loaded from: input_file:io/scalecube/gateway/websocket/GatewayWebsocketAcceptor.class */
public class GatewayWebsocketAcceptor implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(GatewayWebsocketAcceptor.class);
    private final ServiceCall serviceCall;
    private final GatewayMetrics metrics;
    private final GatewayMessageCodec messageCodec = new GatewayMessageCodec();

    public GatewayWebsocketAcceptor(ServiceCall serviceCall, GatewayMetrics gatewayMetrics) {
        this.serviceCall = serviceCall;
        this.metrics = gatewayMetrics;
    }

    @Override // java.util.function.BiFunction
    public Publisher<Void> apply(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse) {
        return httpServerResponse.sendWebsocket((websocketInbound, websocketOutbound) -> {
            return onConnect(new WebsocketSession(httpServerRequest, websocketInbound, websocketOutbound));
        });
    }

    private Mono<Void> onConnect(WebsocketSession websocketSession) {
        LOGGER.info("Session connected: " + websocketSession);
        Mono<Void> send = websocketSession.send(websocketSession.receive().doOnNext(byteBuf -> {
            this.metrics.markRequest();
        }).flatMap(byteBuf2 -> {
            return handleMessage(websocketSession, byteBuf2);
        }).doOnError(th -> {
            LOGGER.error("Unhandled exception occurred: {}, session: {} will be closed", new Object[]{th, websocketSession, th});
        }));
        websocketSession.onClose(() -> {
            LOGGER.info("Session disconnected: " + websocketSession);
        });
        return send.then();
    }

    private Flux<ByteBuf> handleMessage(WebsocketSession websocketSession, ByteBuf byteBuf) {
        return Flux.create(fluxSink -> {
            try {
                GatewayMessage message = toMessage(byteBuf);
                Long streamId = message.streamId();
                checkSidNotNull(streamId, websocketSession, message);
                if (message.hasSignal(Signal.CANCEL)) {
                    handleCancelRequest(fluxSink, streamId, websocketSession, message);
                    return;
                }
                checkSidNotRegisteredYet(streamId, websocketSession, message);
                checkQualifierNotNull(websocketSession, message);
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                Flux requestMany = this.serviceCall.requestMany(GatewayMessage.toServiceMessage(message));
                if (message.inactivity() != null) {
                    requestMany = requestMany.timeout(Duration.ofMillis(message.inactivity().intValue()));
                }
                Flux doFinally = requestMany.map(serviceMessage -> {
                    return prepareResponse(streamId, serviceMessage, atomicBoolean);
                }).concatWith(Mono.defer(() -> {
                    return prepareCompletion(streamId, atomicBoolean);
                })).onErrorResume(th -> {
                    return Mono.just(toErrorMessage(th, streamId));
                }).doFinally(signalType -> {
                    websocketSession.dispose(streamId);
                });
                Consumer consumer = gatewayMessage -> {
                    try {
                        fluxSink.next(toByteBuf(gatewayMessage));
                        if (!gatewayMessage.hasHeader(GatewayMessage.SIGNAL_FIELD)) {
                            this.metrics.markResponse();
                        }
                    } catch (Throwable th2) {
                        LOGGER.error("Failed to encode response message: {}", gatewayMessage, th2);
                    }
                };
                fluxSink.getClass();
                Consumer consumer2 = fluxSink::error;
                fluxSink.getClass();
                websocketSession.register(streamId, doFinally.subscribe(consumer, consumer2, fluxSink::complete));
            } catch (Throwable th2) {
                Optional.ofNullable(null).map((v0) -> {
                    return v0.data();
                }).ifPresent(ReferenceCountUtil::safestRelease);
                handleError(fluxSink, websocketSession, null, th2);
            }
        });
    }

    private Mono<GatewayMessage> prepareCompletion(Long l, AtomicBoolean atomicBoolean) {
        return atomicBoolean.get() ? Mono.empty() : Mono.just(GatewayMessage.builder().streamId(l).signal(Signal.COMPLETE).build());
    }

    private GatewayMessage prepareResponse(Long l, ServiceMessage serviceMessage, AtomicBoolean atomicBoolean) {
        GatewayMessage.Builder streamId = GatewayMessage.from(serviceMessage).streamId(l);
        if (ExceptionProcessor.isError(serviceMessage)) {
            atomicBoolean.set(true);
            streamId.signal(Signal.ERROR);
        }
        return streamId.build();
    }

    private void checkQualifierNotNull(WebsocketSession websocketSession, GatewayMessage gatewayMessage) {
        if (gatewayMessage.qualifier() == null) {
            LOGGER.error("Bad gateway request {} on session {}: qualifier is missing", gatewayMessage, websocketSession);
            throw new BadRequestException("qualifier is missing");
        }
    }

    private void checkSidNotRegisteredYet(Long l, WebsocketSession websocketSession, GatewayMessage gatewayMessage) {
        if (websocketSession.containsSid(l)) {
            LOGGER.error("Bad gateway request {} on session {}: sid={} is already registered", gatewayMessage, websocketSession);
            throw new BadRequestException("sid=" + l + " is already registered");
        }
    }

    private void handleCancelRequest(FluxSink<ByteBuf> fluxSink, Long l, WebsocketSession websocketSession, GatewayMessage gatewayMessage) {
        if (!websocketSession.dispose(l)) {
            LOGGER.error("CANCEL gateway request {} failed in session {}", new Object[]{gatewayMessage, l, websocketSession});
            throw new BadRequestException("Failed CANCEL request");
        }
        Optional.ofNullable(gatewayMessage.data()).ifPresent(ReferenceCountUtil::safestRelease);
        fluxSink.next(toByteBuf(cancelResponse(l)));
        fluxSink.complete();
    }

    private void checkSidNotNull(Long l, WebsocketSession websocketSession, GatewayMessage gatewayMessage) {
        if (l == null) {
            LOGGER.error("Bad gateway request {} on session {}: sid is missing", gatewayMessage, websocketSession);
            throw new BadRequestException("sid is missing");
        }
    }

    private ByteBuf toByteBuf(GatewayMessage gatewayMessage) {
        return this.messageCodec.encode(gatewayMessage);
    }

    private GatewayMessage toMessage(ByteBuf byteBuf) {
        return this.messageCodec.decode(byteBuf);
    }

    private GatewayMessage toErrorMessage(Throwable th, Long l) {
        return GatewayMessage.from(ExceptionProcessor.toMessage(th)).streamId(l).signal(Signal.ERROR).build();
    }

    private GatewayMessage cancelResponse(Long l) {
        return GatewayMessage.builder().streamId(l).signal(Signal.CANCEL).build();
    }

    private void handleError(FluxSink<ByteBuf> fluxSink, WebsocketSession websocketSession, Long l, Throwable th) {
        try {
            fluxSink.next(toByteBuf(toErrorMessage(th, l)));
            fluxSink.complete();
        } catch (Throwable th2) {
            LOGGER.error("Failed to send error message on session {}: on sid={}, muted cause={}", new Object[]{l, websocketSession, th, th2});
        }
    }
}
