package io.scalecube.gateway.clientsdk.websocket;

import io.netty.buffer.ByteBuf;
import io.scalecube.gateway.clientsdk.ClientCodec;
import io.scalecube.gateway.clientsdk.ClientMessage;
import io.scalecube.gateway.clientsdk.ClientSettings;
import io.scalecube.gateway.clientsdk.ClientTransport;
import io.scalecube.gateway.clientsdk.ErrorData;
import io.scalecube.gateway.clientsdk.exceptions.ExceptionProcessor;
import io.scalecube.services.exceptions.ServiceException;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/gateway/clientsdk/websocket/WebsocketClientTransport.class */
public final class WebsocketClientTransport implements ClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketClientTransport.class);
    private static final AtomicReferenceFieldUpdater<WebsocketClientTransport, Mono> websocketMonoUpdater = AtomicReferenceFieldUpdater.newUpdater(WebsocketClientTransport.class, Mono.class, "websocketMono");
    private final ClientCodec<ByteBuf> codec;
    private final InetSocketAddress address;
    private final HttpClient httpClient;
    private final AtomicLong sidCounter = new AtomicLong();
    private volatile Mono<?> websocketMono;

    public WebsocketClientTransport(ClientSettings clientSettings, ClientCodec<ByteBuf> clientCodec, LoopResources loopResources) {
        this.codec = clientCodec;
        this.address = InetSocketAddress.createUnresolved(clientSettings.host(), clientSettings.port());
        this.httpClient = HttpClient.create(builder -> {
            builder.disablePool().connectAddress(() -> {
                return this.address;
            }).loopResources(loopResources);
        });
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Mono<ClientMessage> requestResponse(ClientMessage clientMessage) {
        return Mono.defer(() -> {
            String valueOf = String.valueOf(this.sidCounter.incrementAndGet());
            ByteBuf request = toRequest(clientMessage, valueOf);
            return getOrConnect().flatMap(websocketSession -> {
                return websocketSession.send(request).then(receiveResponse(websocketSession.receive(), valueOf)).doOnCancel(() -> {
                    handleCancel(valueOf, websocketSession);
                });
            });
        });
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Flux<ClientMessage> requestStream(ClientMessage clientMessage) {
        return Flux.defer(() -> {
            String valueOf = String.valueOf(this.sidCounter.incrementAndGet());
            ByteBuf request = toRequest(clientMessage, valueOf);
            return getOrConnect().flatMapMany(websocketSession -> {
                return websocketSession.send(request).thenMany(receiveStream(websocketSession.receive(), valueOf)).doOnCancel(() -> {
                    handleCancel(valueOf, websocketSession);
                });
            });
        });
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Mono<Void> close() {
        return Mono.defer(() -> {
            Mono mono = websocketMonoUpdater.get(this);
            return (mono == null ? Mono.empty() : mono.flatMap((v0) -> {
                return v0.close();
            })).doOnTerminate(() -> {
                LOGGER.info("Closed websocket client sdk transport");
            });
        });
    }

    private Mono<WebsocketSession> getOrConnect() {
        return Mono.defer(() -> {
            return websocketMonoUpdater.updateAndGet(this, this::getOrConnect0);
        });
    }

    private Mono<WebsocketSession> getOrConnect0(Mono<WebsocketSession> mono) {
        return mono != null ? mono : this.httpClient.ws("/").flatMap(httpClientResponse -> {
            return Mono.create(monoSink -> {
                Mono receiveWebsocket = httpClientResponse.receiveWebsocket((websocketInbound, websocketOutbound) -> {
                    LOGGER.info("Connected successfully to {}", this.address);
                    WebsocketSession websocketSession = new WebsocketSession(websocketInbound, websocketOutbound);
                    monoSink.success(websocketSession);
                    return websocketSession.onClose(() -> {
                        LOGGER.info("Connection to {} has been closed successfully", this.address);
                    });
                });
                monoSink.getClass();
                receiveWebsocket.doOnError(monoSink::error).subscribe();
            });
        }).doOnError(th -> {
            LOGGER.warn("Connection to {} is failed, cause: {}", this.address, th);
            websocketMonoUpdater.getAndSet(this, null);
        }).cache();
    }

    private Disposable handleCancel(String str, WebsocketSession websocketSession) {
        return websocketSession.send(this.codec.encode(ClientMessage.builder().header("sid", str).header("sig", Signal.CANCEL.codeAsString()).build())).subscribe();
    }

    private ClientMessage enrichResponse(ClientMessage clientMessage) {
        return ClientMessage.from(clientMessage).header("client-recv-time", String.valueOf(System.currentTimeMillis())).build();
    }

    private ByteBuf toRequest(ClientMessage clientMessage, String str) {
        return this.codec.encode(ClientMessage.from(clientMessage).header("client-send-time", String.valueOf(System.currentTimeMillis())).header("sid", str).build());
    }

    private Flux<ClientMessage> receiveStream(Flux<ByteBuf> flux, String str) {
        return Flux.create(fluxSink -> {
            receiveBySid(flux, str).subscribe(clientMessage -> {
                fluxSink.getClass();
                Consumer<ClientMessage> consumer = (v1) -> {
                    r2.next(v1);
                };
                fluxSink.getClass();
                Runnable runnable = fluxSink::complete;
                fluxSink.getClass();
                handleResponse(clientMessage, consumer, runnable, fluxSink::error);
            });
        });
    }

    private Mono<ClientMessage> receiveResponse(Flux<ByteBuf> flux, String str) {
        return Mono.create(monoSink -> {
            receiveBySid(flux, str).subscribe(clientMessage -> {
                monoSink.getClass();
                Consumer<ClientMessage> consumer = (v1) -> {
                    r2.success(v1);
                };
                monoSink.getClass();
                Runnable runnable = monoSink::success;
                monoSink.getClass();
                handleResponse(clientMessage, consumer, runnable, monoSink::error);
            });
        });
    }

    private Flux<ClientMessage> receiveBySid(Flux<ByteBuf> flux, String str) {
        Flux publishOn = flux.publishOn(Schedulers.single(), Integer.MAX_VALUE);
        ClientCodec<ByteBuf> clientCodec = this.codec;
        clientCodec.getClass();
        return publishOn.map((v1) -> {
            return r1.decode(v1);
        }).filter(clientMessage -> {
            return str.equals(clientMessage.header("sid"));
        }).log(">>> SID_RECEIVE", Level.FINE, new SignalType[0]).map(this::enrichResponse);
    }

    private void handleResponse(ClientMessage clientMessage, Consumer<ClientMessage> consumer, Runnable runnable, Consumer<Throwable> consumer2) {
        try {
            Optional map = Optional.ofNullable(clientMessage.header("sig")).map(Signal::from);
            if (map.isPresent()) {
                Signal signal = (Signal) map.get();
                if (signal == Signal.COMPLETE) {
                    runnable.run();
                }
                if (signal == Signal.ERROR) {
                    ErrorData errorData = (ErrorData) this.codec.decodeData(clientMessage, ErrorData.class).data();
                    ServiceException exception = ExceptionProcessor.toException(clientMessage.qualifier(), errorData.getErrorCode(), errorData.getErrorMessage());
                    LOGGER.error("Received error response: sid={}, error={}", clientMessage.header("sid"), exception);
                    consumer2.accept(exception);
                }
            } else {
                consumer.accept(clientMessage);
            }
        } catch (Exception e) {
            consumer2.accept(e);
        }
    }
}
