package com.azure.messaging.webpubsub.client;

import com.azure.core.http.policy.RetryStrategy;
import com.azure.core.util.BinaryData;
import com.azure.core.util.UrlBuilder;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.webpubsub.client.implementation.LoggingUtils;
import com.azure.messaging.webpubsub.client.implementation.WebPubSubClientState;
import com.azure.messaging.webpubsub.client.implementation.WebPubSubConnection;
import com.azure.messaging.webpubsub.client.implementation.WebPubSubGroup;
import com.azure.messaging.webpubsub.client.implementation.models.AckMessage;
import com.azure.messaging.webpubsub.client.implementation.models.ConnectedMessage;
import com.azure.messaging.webpubsub.client.implementation.models.DisconnectedMessage;
import com.azure.messaging.webpubsub.client.implementation.models.GroupDataMessage;
import com.azure.messaging.webpubsub.client.implementation.models.JoinGroupMessage;
import com.azure.messaging.webpubsub.client.implementation.models.LeaveGroupMessage;
import com.azure.messaging.webpubsub.client.implementation.models.SendEventMessage;
import com.azure.messaging.webpubsub.client.implementation.models.SendToGroupMessage;
import com.azure.messaging.webpubsub.client.implementation.models.SequenceAckMessage;
import com.azure.messaging.webpubsub.client.implementation.models.ServerDataMessage;
import com.azure.messaging.webpubsub.client.implementation.models.WebPubSubMessage;
import com.azure.messaging.webpubsub.client.implementation.models.WebPubSubMessageAck;
import com.azure.messaging.webpubsub.client.implementation.websocket.ClientEndpointConfiguration;
import com.azure.messaging.webpubsub.client.implementation.websocket.CloseReason;
import com.azure.messaging.webpubsub.client.implementation.websocket.WebSocketClient;
import com.azure.messaging.webpubsub.client.implementation.websocket.WebSocketClientNettyImpl;
import com.azure.messaging.webpubsub.client.implementation.websocket.WebSocketSession;
import com.azure.messaging.webpubsub.client.models.AckResponseError;
import com.azure.messaging.webpubsub.client.models.ConnectedEvent;
import com.azure.messaging.webpubsub.client.models.DisconnectedEvent;
import com.azure.messaging.webpubsub.client.models.GroupMessageEvent;
import com.azure.messaging.webpubsub.client.models.RejoinGroupFailedEvent;
import com.azure.messaging.webpubsub.client.models.SendEventOptions;
import com.azure.messaging.webpubsub.client.models.SendMessageFailedException;
import com.azure.messaging.webpubsub.client.models.SendToGroupOptions;
import com.azure.messaging.webpubsub.client.models.ServerMessageEvent;
import com.azure.messaging.webpubsub.client.models.StoppedEvent;
import com.azure.messaging.webpubsub.client.models.WebPubSubDataFormat;
import com.azure.messaging.webpubsub.client.models.WebPubSubProtocolType;
import com.azure.messaging.webpubsub.client.models.WebPubSubResult;
import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
import reactor.util.retry.Retry;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.class */
public final class WebPubSubAsyncClient implements Closeable {
    private ClientLogger logger;
    private final Mono<String> clientAccessUrlProvider;
    private final WebPubSubProtocolType webPubSubProtocol;
    private final boolean autoReconnect;
    private final boolean autoRestoreGroup;
    private final String applicationId;
    private final ClientEndpointConfiguration clientEndpointConfiguration;
    private final WebSocketClient webSocketClient;
    private WebSocketSession webSocketSession;
    private WebPubSubConnection webPubSubConnection;
    private final Retry sendMessageRetrySpec;
    private static final Duration ACK_TIMEOUT = Duration.ofSeconds(30);
    private static final Duration RECOVER_TIMEOUT = Duration.ofSeconds(30);
    private static final Retry RECONNECT_RETRY_SPEC = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).filter(th -> {
        return !(th instanceof StopReconnectException);
    });
    private static final Duration CLOSE_AFTER_SESSION_OPEN_DELAY = Duration.ofMillis(100);
    private static final Duration SEQUENCE_ACK_DELAY = Duration.ofSeconds(5);
    private final AtomicReference<ClientLogger> loggerReference = new AtomicReference<>();
    private Sinks.Many<GroupMessageEvent> groupMessageEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<ServerMessageEvent> serverMessageEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<AckMessage> ackMessageSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<ConnectedEvent> connectedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<DisconnectedEvent> disconnectedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<StoppedEvent> stoppedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<RejoinGroupFailedEvent> rejoinGroupFailedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private final AtomicLong ackId = new AtomicLong(0);
    private final AtomicReference<Disposable> sequenceAckTask = new AtomicReference<>();
    private final ClientState clientState = new ClientState();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();
    private final AtomicBoolean isStoppedByUser = new AtomicBoolean();
    private final AtomicReference<Sinks.Empty<Void>> isStoppedByUserSink = new AtomicReference<>();
    private final ConcurrentMap<String, WebPubSubGroup> groups = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/messaging/webpubsub/client/WebPubSubAsyncClient$ClientState.class */
    public final class ClientState {
        private final AtomicReference<WebPubSubClientState> clientState;

        private ClientState() {
            this.clientState = new AtomicReference<>(WebPubSubClientState.STOPPED);
        }

        WebPubSubClientState get() {
            return this.clientState.get();
        }

        WebPubSubClientState changeState(WebPubSubClientState webPubSubClientState) {
            WebPubSubClientState andSet = this.clientState.getAndSet(webPubSubClientState);
            WebPubSubAsyncClient.this.logger.atInfo().addKeyValue("currentClientState", webPubSubClientState).addKeyValue("previousClientState", andSet).log("Client state changed.");
            return andSet;
        }

        boolean changeStateOn(WebPubSubClientState webPubSubClientState, WebPubSubClientState webPubSubClientState2) {
            boolean compareAndSet = this.clientState.compareAndSet(webPubSubClientState, webPubSubClientState2);
            if (compareAndSet) {
                WebPubSubAsyncClient.this.logger.atInfo().addKeyValue("currentClientState", webPubSubClientState2).addKeyValue("previousClientState", webPubSubClientState).log("Client state changed.");
            }
            return compareAndSet;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/messaging/webpubsub/client/WebPubSubAsyncClient$StopReconnectException.class */
    public static final class StopReconnectException extends RuntimeException {
        private StopReconnectException(String str) {
            super(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WebPubSubAsyncClient(WebSocketClient webSocketClient, Supplier<String> supplier, WebPubSubProtocolType webPubSubProtocolType, String str, String str2, RetryStrategy retryStrategy, boolean z, boolean z2) {
        updateLogger(str, null);
        this.applicationId = str;
        Objects.requireNonNull(supplier);
        this.clientAccessUrlProvider = Mono.fromSupplier(supplier).subscribeOn(Schedulers.boundedElastic());
        this.webPubSubProtocol = (WebPubSubProtocolType) Objects.requireNonNull(webPubSubProtocolType);
        this.autoReconnect = z;
        this.autoRestoreGroup = z2;
        this.clientEndpointConfiguration = new ClientEndpointConfiguration(webPubSubProtocolType.toString(), str2);
        this.webSocketClient = webSocketClient == null ? new WebSocketClientNettyImpl() : webSocketClient;
        this.sendMessageRetrySpec = Retry.from(flux -> {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            return flux.concatMap(retrySignal -> {
                int incrementAndGet;
                Mono error = Mono.error(retrySignal.failure());
                if ((retrySignal.failure() instanceof SendMessageFailedException) && ((SendMessageFailedException) retrySignal.failure()).isTransient() && (incrementAndGet = atomicInteger.incrementAndGet()) <= retryStrategy.getMaxRetries()) {
                    error = Mono.delay(retryStrategy.calculateRetryDelay(incrementAndGet)).then(Mono.just(retrySignal));
                }
                return error;
            });
        });
    }

    public String getConnectionId() {
        if (this.webPubSubConnection == null) {
            return null;
        }
        return this.webPubSubConnection.getConnectionId();
    }

    public Mono<Void> start() {
        return start(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> start(Runnable runnable) {
        return this.clientState.get() == WebPubSubClientState.CLOSED ? Mono.error(this.logger.logExceptionAsError(new IllegalStateException("Failed to start. Client is CLOSED."))) : Mono.defer(() -> {
            this.logger.atInfo().addKeyValue("currentClientState", this.clientState.get()).log("Start client called.");
            this.isStoppedByUser.set(false);
            this.isStoppedByUserSink.set(null);
            if (!this.clientState.changeStateOn(WebPubSubClientState.STOPPED, WebPubSubClientState.CONNECTING)) {
                return Mono.error(this.logger.logExceptionAsError(new IllegalStateException("Failed to start. Client is not STOPPED.")));
            }
            if (runnable != null) {
                runnable.run();
            }
            return Mono.empty();
        }).then(this.clientAccessUrlProvider.flatMap(str -> {
            return Mono.fromRunnable(() -> {
                this.webSocketSession = this.webSocketClient.connectToServer(this.clientEndpointConfiguration, str, this.loggerReference, this::handleMessage, this::handleSessionOpen, this::handleSessionClose);
            }).subscribeOn(Schedulers.boundedElastic());
        })).doOnError(th -> {
            handleClientStop(false);
        });
    }

    public Mono<Void> stop() {
        return this.clientState.get() == WebPubSubClientState.CLOSED ? Mono.error(this.logger.logExceptionAsError(new IllegalStateException("Failed to stop. Client is CLOSED."))) : Mono.defer(() -> {
            this.logger.atInfo().addKeyValue("currentClientState", this.clientState.get()).log("Stop client called.");
            if (this.clientState.get() == WebPubSubClientState.STOPPED) {
                return Mono.empty();
            }
            if (this.clientState.get() == WebPubSubClientState.STOPPING) {
                return getStoppedByUserMono();
            }
            this.isStoppedByUser.compareAndSet(false, true);
            this.groups.clear();
            WebSocketSession webSocketSession = this.webSocketSession;
            if (webSocketSession != null && webSocketSession.isOpen()) {
                this.clientState.changeState(WebPubSubClientState.STOPPING);
                return Mono.fromCallable(() -> {
                    webSocketSession.close();
                    return (Void) null;
                }).subscribeOn(Schedulers.boundedElastic());
            }
            if (!this.clientState.changeStateOn(WebPubSubClientState.DISCONNECTED, WebPubSubClientState.STOPPED)) {
                return getStoppedByUserMono();
            }
            handleClientStop();
            return Mono.empty();
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            this.isClosedMono.asMono().block();
        } else {
            stop().then(Mono.fromRunnable(() -> {
                this.clientState.changeState(WebPubSubClientState.CLOSED);
                this.isClosedMono.emitEmpty(emitFailureHandler("Unable to emit Close"));
            })).block();
        }
    }

    public Mono<WebPubSubResult> joinGroup(String str) {
        return joinGroup(str, Long.valueOf(nextAckId()));
    }

    public Mono<WebPubSubResult> joinGroup(String str, Long l) {
        Objects.requireNonNull(str);
        if (l == null) {
            l = Long.valueOf(nextAckId());
        }
        return sendMessage(new JoinGroupMessage().setGroup(str).setAckId(l)).then(waitForAckMessage(l)).retryWhen(this.sendMessageRetrySpec).map(webPubSubResult -> {
            this.groups.compute(str, (str2, webPubSubGroup) -> {
                return webPubSubGroup == null ? new WebPubSubGroup(str).setJoined(true) : webPubSubGroup.setJoined(true);
            });
            return webPubSubResult;
        });
    }

    public Mono<WebPubSubResult> leaveGroup(String str) {
        return leaveGroup(str, Long.valueOf(nextAckId()));
    }

    public Mono<WebPubSubResult> leaveGroup(String str, Long l) {
        Objects.requireNonNull(str);
        if (l == null) {
            l = Long.valueOf(nextAckId());
        }
        return sendMessage(new LeaveGroupMessage().setGroup(str).setAckId(l)).then(waitForAckMessage(l)).retryWhen(this.sendMessageRetrySpec).map(webPubSubResult -> {
            this.groups.compute(str, (str2, webPubSubGroup) -> {
                return webPubSubGroup == null ? new WebPubSubGroup(str).setJoined(false) : webPubSubGroup.setJoined(false);
            });
            return webPubSubResult;
        });
    }

    public Mono<WebPubSubResult> sendToGroup(String str, String str2) {
        return sendToGroup(str, BinaryData.fromString(str2), WebPubSubDataFormat.TEXT);
    }

    public Mono<WebPubSubResult> sendToGroup(String str, String str2, SendToGroupOptions sendToGroupOptions) {
        return sendToGroup(str, BinaryData.fromString(str2), WebPubSubDataFormat.TEXT, sendToGroupOptions);
    }

    public Mono<WebPubSubResult> sendToGroup(String str, BinaryData binaryData, WebPubSubDataFormat webPubSubDataFormat) {
        return sendToGroup(str, binaryData, webPubSubDataFormat, new SendToGroupOptions().setAckId(Long.valueOf(nextAckId())));
    }

    public Mono<WebPubSubResult> sendToGroup(String str, BinaryData binaryData, WebPubSubDataFormat webPubSubDataFormat, SendToGroupOptions sendToGroupOptions) {
        Long valueOf;
        Objects.requireNonNull(str);
        Objects.requireNonNull(binaryData);
        Objects.requireNonNull(webPubSubDataFormat);
        Objects.requireNonNull(sendToGroupOptions);
        if (sendToGroupOptions.isFireAndForget()) {
            valueOf = null;
        } else {
            valueOf = Long.valueOf(sendToGroupOptions.getAckId() != null ? sendToGroupOptions.getAckId().longValue() : nextAckId());
        }
        Long l = valueOf;
        return sendMessage(new SendToGroupMessage().setGroup(str).setData(binaryData).setDataType(webPubSubDataFormat.toString()).setAckId(l).setNoEcho(Boolean.valueOf(sendToGroupOptions.isEchoDisabled()))).then(waitForAckMessage(l)).retryWhen(this.sendMessageRetrySpec);
    }

    public Mono<WebPubSubResult> sendEvent(String str, BinaryData binaryData, WebPubSubDataFormat webPubSubDataFormat) {
        return sendEvent(str, binaryData, webPubSubDataFormat, new SendEventOptions().setAckId(Long.valueOf(nextAckId())));
    }

    public Mono<WebPubSubResult> sendEvent(String str, BinaryData binaryData, WebPubSubDataFormat webPubSubDataFormat, SendEventOptions sendEventOptions) {
        Long valueOf;
        Objects.requireNonNull(str);
        Objects.requireNonNull(binaryData);
        Objects.requireNonNull(webPubSubDataFormat);
        Objects.requireNonNull(sendEventOptions);
        if (sendEventOptions.isFireAndForget()) {
            valueOf = null;
        } else {
            valueOf = Long.valueOf(sendEventOptions.getAckId() != null ? sendEventOptions.getAckId().longValue() : nextAckId());
        }
        Long l = valueOf;
        return sendMessage(new SendEventMessage().setEvent(str).setData(binaryData).setDataType(webPubSubDataFormat.toString()).setAckId(l)).then(waitForAckMessage(l)).retryWhen(this.sendMessageRetrySpec);
    }

    public Flux<GroupMessageEvent> receiveGroupMessageEvents() {
        return this.groupMessageEventSink.asFlux();
    }

    public Flux<ServerMessageEvent> receiveServerMessageEvents() {
        return this.serverMessageEventSink.asFlux();
    }

    public Flux<ConnectedEvent> receiveConnectedEvents() {
        return this.connectedEventSink.asFlux();
    }

    public Flux<DisconnectedEvent> receiveDisconnectedEvents() {
        return this.disconnectedEventSink.asFlux();
    }

    public Flux<StoppedEvent> receiveStoppedEvents() {
        return this.stoppedEventSink.asFlux();
    }

    public Flux<RejoinGroupFailedEvent> receiveRejoinGroupFailedEvents() {
        return this.rejoinGroupFailedEventSink.asFlux();
    }

    private long nextAckId() {
        return this.ackId.getAndUpdate(j -> {
            long j = j + 1;
            long j2 = j;
            if (j < 0) {
                j2 = 0;
            }
            return j2;
        });
    }

    private Flux<AckMessage> receiveAckMessages() {
        return this.ackMessageSink.asFlux();
    }

    private Mono<Void> sendMessage(WebPubSubMessage webPubSubMessage) {
        return checkStateBeforeSend().then(Mono.create(monoSink -> {
            this.webSocketSession.sendObjectAsync(webPubSubMessage, sendResult -> {
                if (sendResult.isOK()) {
                    monoSink.success();
                } else {
                    monoSink.error(logSendMessageFailedException("Failed to send message.", sendResult.getException(), true, webPubSubMessage));
                }
            });
        }));
    }

    private Mono<Void> checkStateBeforeSend() {
        return Mono.defer(() -> {
            WebPubSubClientState webPubSubClientState = this.clientState.get();
            if (webPubSubClientState == WebPubSubClientState.CLOSED) {
                return Mono.error(this.logger.logExceptionAsError(new IllegalStateException("Failed to send message. WebPubSubClient is CLOSED.")));
            }
            if (webPubSubClientState != WebPubSubClientState.CONNECTED) {
                return Mono.error(logSendMessageFailedException("Failed to send message. Client is " + webPubSubClientState.name() + ".", (Throwable) null, webPubSubClientState == WebPubSubClientState.RECOVERING || webPubSubClientState == WebPubSubClientState.CONNECTING || webPubSubClientState == WebPubSubClientState.RECONNECTING || webPubSubClientState == WebPubSubClientState.DISCONNECTED, (Long) null));
            }
            return (this.webSocketSession == null || !this.webSocketSession.isOpen()) ? Mono.error(logSendMessageFailedException("Failed to send message. Websocket session is not opened.", (Throwable) null, false, (Long) null)) : Mono.empty();
        });
    }

    private Mono<Void> getStoppedByUserMono() {
        Sinks.Empty<Void> empty = Sinks.empty();
        if (!this.isStoppedByUserSink.compareAndSet(null, empty)) {
            empty = this.isStoppedByUserSink.get();
        }
        return empty == null ? Mono.empty() : empty.asMono();
    }

    private void tryCompleteOnStoppedByUserSink() {
        Sinks.Empty<Void> andSet = this.isStoppedByUserSink.getAndSet(null);
        if (andSet != null) {
            andSet.emitEmpty(emitFailureHandler("Unable to emit Stopped"));
        }
    }

    private <EventT> void tryEmitNext(Sinks.Many<EventT> many, EventT eventt) {
        this.logger.atVerbose().addKeyValue("type", eventt.getClass().getSimpleName()).log("Send event");
        many.emitNext(eventt, emitFailureHandler("Unable to emit " + eventt.getClass().getSimpleName()));
    }

    private Mono<WebPubSubResult> waitForAckMessage(Long l) {
        return l == null ? Mono.just(new WebPubSubResult(null, false)) : receiveAckMessages().filter(ackMessage -> {
            return l.longValue() == ackMessage.getAckId();
        }).next().onErrorMap(th -> {
            return logSendMessageFailedException("Acknowledge from the service not received.", th, true, l);
        }).flatMap(ackMessage2 -> {
            return ackMessage2.isSuccess() ? Mono.just(new WebPubSubResult(Long.valueOf(ackMessage2.getAckId()), false)) : (ackMessage2.getError() == null || !"Duplicate".equals(ackMessage2.getError().getName())) ? Mono.error(logSendMessageFailedException("Received non-success acknowledge from the service.", null, false, l, ackMessage2.getError())) : Mono.just(new WebPubSubResult(Long.valueOf(ackMessage2.getAckId()), true));
        }).timeout(ACK_TIMEOUT, Mono.empty()).switchIfEmpty(Mono.defer(() -> {
            return Mono.error(logSendMessageFailedException("Acknowledge from the service not received.", (Throwable) null, true, l));
        }));
    }

    private void handleSessionOpen(WebSocketSession webSocketSession) {
        this.logger.atVerbose().log("Session opened");
        this.clientState.changeState(WebPubSubClientState.CONNECTED);
        if (this.isStoppedByUser.compareAndSet(true, false)) {
            Mono.delay(CLOSE_AFTER_SESSION_OPEN_DELAY).then(Mono.fromCallable(() -> {
                this.clientState.changeState(WebPubSubClientState.STOPPING);
                if (webSocketSession == null || !webSocketSession.isOpen()) {
                    this.logger.atError().log("Failed to close session after session open");
                    handleClientStop();
                } else {
                    webSocketSession.close();
                }
                return (Void) null;
            }).subscribeOn(Schedulers.boundedElastic())).subscribe((Consumer) null, th -> {
                this.logger.atError().log("Failed to close session after session open: " + th.getMessage());
                handleClientStop();
            });
            return;
        }
        if (isReliableProtocol(this.webPubSubProtocol)) {
            Disposable andSet = this.sequenceAckTask.getAndSet(Flux.interval(SEQUENCE_ACK_DELAY).concatMap(l -> {
                Long updated;
                if (this.clientState.get() != WebPubSubClientState.CONNECTED || webSocketSession == null || !webSocketSession.isOpen()) {
                    return Mono.empty();
                }
                WebPubSubConnection webPubSubConnection = this.webPubSubConnection;
                if (webPubSubConnection != null && (updated = webPubSubConnection.getSequenceAckId().getUpdated()) != null) {
                    return sendMessage(new SequenceAckMessage().setSequenceId(updated.longValue())).onErrorResume(th2 -> {
                        webPubSubConnection.getSequenceAckId().setUpdated();
                        return Mono.empty();
                    });
                }
                return Mono.empty();
            }).subscribe());
            if (andSet != null) {
                andSet.dispose();
            }
        }
        if (this.autoRestoreGroup) {
            Mono.delay(CLOSE_AFTER_SESSION_OPEN_DELAY).thenMany(Flux.mergeSequentialDelayError((List) this.groups.values().stream().filter((v0) -> {
                return v0.isJoined();
            }).map(webPubSubGroup -> {
                return joinGroup(webPubSubGroup.getName()).onErrorResume(th2 -> {
                    if (th2 instanceof SendMessageFailedException) {
                        tryEmitNext(this.rejoinGroupFailedEventSink, new RejoinGroupFailedEvent(webPubSubGroup.getName(), (SendMessageFailedException) th2));
                    }
                    return Mono.empty();
                });
            }).collect(Collectors.toList()), Schedulers.DEFAULT_POOL_SIZE, Schedulers.DEFAULT_POOL_SIZE)).subscribe((Consumer) null, th2 -> {
                this.logger.atWarning().log("Failed to auto restore group: " + th2.getMessage());
            });
        }
    }

    private void handleSessionClose(CloseReason closeReason) {
        this.logger.atVerbose().addKeyValue("code", closeReason.getCloseCode()).log("Session closed");
        if (this.clientState.get() == WebPubSubClientState.STOPPED) {
            return;
        }
        String connectionId = getConnectionId();
        if (this.isStoppedByUser.compareAndSet(true, false) || this.clientState.get() == WebPubSubClientState.STOPPING) {
            handleConnectionClose();
            handleClientStop();
            return;
        }
        if (closeReason.getCloseCode() == 1008) {
            this.clientState.changeState(WebPubSubClientState.DISCONNECTED);
            handleConnectionClose();
            handleNoRecovery().subscribe((Consumer) null, th -> {
                this.logger.atWarning().log("Failed to auto reconnect session: " + th.getMessage());
            });
            return;
        }
        WebPubSubConnection webPubSubConnection = this.webPubSubConnection;
        String reconnectionToken = webPubSubConnection == null ? null : webPubSubConnection.getReconnectionToken();
        if (isReliableProtocol(this.webPubSubProtocol) && reconnectionToken != null && connectionId != null) {
            handleRecovery(connectionId, reconnectionToken).timeout(RECOVER_TIMEOUT, Mono.defer(() -> {
                this.clientState.changeState(WebPubSubClientState.DISCONNECTED);
                handleConnectionClose();
                return handleNoRecovery();
            })).subscribe((Consumer) null, th2 -> {
                this.logger.atWarning().log("Failed to recover or reconnect session: " + th2.getMessage());
            });
            return;
        }
        this.clientState.changeState(WebPubSubClientState.DISCONNECTED);
        handleConnectionClose();
        handleNoRecovery().subscribe((Consumer) null, th3 -> {
            this.logger.atWarning().log("Failed to auto reconnect session: " + th3.getMessage());
        });
    }

    private void handleMessage(Object obj) {
        if (obj instanceof GroupDataMessage) {
            GroupDataMessage groupDataMessage = (GroupDataMessage) obj;
            boolean z = true;
            if (groupDataMessage.getSequenceId() != null) {
                z = updateSequenceAckId(groupDataMessage.getSequenceId().longValue());
            }
            if (z) {
                tryEmitNext(this.groupMessageEventSink, new GroupMessageEvent(groupDataMessage.getGroup(), groupDataMessage.getData(), groupDataMessage.getDataType(), groupDataMessage.getFromUserId(), groupDataMessage.getSequenceId()));
                return;
            }
            return;
        }
        if (obj instanceof ServerDataMessage) {
            ServerDataMessage serverDataMessage = (ServerDataMessage) obj;
            boolean z2 = true;
            if (serverDataMessage.getSequenceId() != null) {
                z2 = updateSequenceAckId(serverDataMessage.getSequenceId().longValue());
            }
            if (z2) {
                tryEmitNext(this.serverMessageEventSink, new ServerMessageEvent(serverDataMessage.getData(), serverDataMessage.getDataType(), serverDataMessage.getSequenceId()));
                return;
            }
            return;
        }
        if (obj instanceof AckMessage) {
            tryEmitNext(this.ackMessageSink, (AckMessage) obj);
            return;
        }
        if (!(obj instanceof ConnectedMessage)) {
            if (obj instanceof DisconnectedMessage) {
                handleConnectionClose(new DisconnectedEvent(getConnectionId(), ((DisconnectedMessage) obj).getReason()));
            }
        } else {
            ConnectedMessage connectedMessage = (ConnectedMessage) obj;
            String connectionId = connectedMessage.getConnectionId();
            updateLogger(this.applicationId, connectionId);
            if (this.webPubSubConnection == null) {
                this.webPubSubConnection = new WebPubSubConnection();
            }
            this.webPubSubConnection.updateForConnected(connectedMessage.getConnectionId(), connectedMessage.getReconnectionToken(), () -> {
                tryEmitNext(this.connectedEventSink, new ConnectedEvent(connectionId, connectedMessage.getUserId()));
            });
        }
    }

    private boolean updateSequenceAckId(long j) {
        WebPubSubConnection webPubSubConnection = this.webPubSubConnection;
        if (webPubSubConnection != null) {
            return webPubSubConnection.getSequenceAckId().update(j);
        }
        return false;
    }

    private Mono<Void> handleNoRecovery() {
        return Mono.defer(() -> {
            if (this.isStoppedByUser.compareAndSet(true, false)) {
                handleClientStop();
                return Mono.empty();
            }
            if (this.autoReconnect) {
                return !this.clientState.changeStateOn(WebPubSubClientState.DISCONNECTED, WebPubSubClientState.RECONNECTING) ? Mono.error(this.logger.logExceptionAsError(new StopReconnectException("Failed to start. Client is not DISCONNECTED."))) : Mono.defer(() -> {
                    return this.isStoppedByUser.compareAndSet(true, false) ? Mono.error(this.logger.logExceptionAsWarning(new StopReconnectException("Client is stopped by user."))) : Mono.empty();
                }).then(this.clientAccessUrlProvider.flatMap(str -> {
                    return Mono.fromRunnable(() -> {
                        this.webSocketSession = this.webSocketClient.connectToServer(this.clientEndpointConfiguration, str, this.loggerReference, this::handleMessage, this::handleSessionOpen, this::handleSessionClose);
                    }).subscribeOn(Schedulers.boundedElastic());
                })).retryWhen(RECONNECT_RETRY_SPEC).doOnError(th -> {
                    handleClientStop();
                });
            }
            handleClientStop();
            return Mono.empty();
        });
    }

    private Mono<Void> handleRecovery(String str, String str2) {
        return Mono.defer(() -> {
            if (!this.isStoppedByUser.compareAndSet(true, false)) {
                return !this.clientState.changeStateOn(WebPubSubClientState.CONNECTED, WebPubSubClientState.RECOVERING) ? Mono.error(this.logger.logExceptionAsError(new StopReconnectException("Failed to recover. Client is not CONNECTED."))) : Mono.defer(() -> {
                    return this.isStoppedByUser.compareAndSet(true, false) ? Mono.error(this.logger.logExceptionAsWarning(new StopReconnectException("Client is stopped by user."))) : Mono.empty();
                }).then(this.clientAccessUrlProvider.flatMap(str3 -> {
                    return Mono.fromRunnable(() -> {
                        this.webSocketSession = this.webSocketClient.connectToServer(this.clientEndpointConfiguration, UrlBuilder.parse(str3).addQueryParameter("awps_connection_id", str).addQueryParameter("awps_reconnection_token", str2).toString(), this.loggerReference, this::handleMessage, this::handleSessionOpen, this::handleSessionClose);
                    }).subscribeOn(Schedulers.boundedElastic());
                })).retryWhen(RECONNECT_RETRY_SPEC).doOnError(th -> {
                    handleClientStop();
                });
            }
            handleClientStop();
            return Mono.empty();
        });
    }

    private void handleClientStop() {
        handleClientStop(true);
    }

    private void handleClientStop(boolean z) {
        this.clientState.changeState(WebPubSubClientState.STOPPED);
        this.webSocketSession = null;
        this.webPubSubConnection = null;
        tryCompleteOnStoppedByUserSink();
        Disposable andSet = this.sequenceAckTask.getAndSet(null);
        if (andSet != null) {
            andSet.dispose();
        }
        if (z) {
            tryEmitNext(this.stoppedEventSink, new StoppedEvent());
        }
        this.groupMessageEventSink.emitComplete(emitFailureHandler("Unable to emit Complete to groupMessageEventSink"));
        this.groupMessageEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.serverMessageEventSink.emitComplete(emitFailureHandler("Unable to emit Complete to groupMessageEventSink"));
        this.serverMessageEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.connectedEventSink.emitComplete(emitFailureHandler("Unable to emit Complete to connectedEventSink"));
        this.connectedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.disconnectedEventSink.emitComplete(emitFailureHandler("Unable to emit Complete to disconnectedEventSink"));
        this.disconnectedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.stoppedEventSink.emitComplete(emitFailureHandler("Unable to emit Complete to disconnectedEventSink"));
        this.stoppedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.rejoinGroupFailedEventSink.emitComplete(emitFailureHandler("Unable to emit Complete to rejoinGroupFailedEventSink"));
        this.rejoinGroupFailedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.ackMessageSink.emitComplete(emitFailureHandler("Unable to emit Complete to ackMessageSink"));
        this.ackMessageSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        updateLogger(this.applicationId, null);
    }

    private void handleConnectionClose() {
        handleConnectionClose(null);
    }

    private void handleConnectionClose(DisconnectedEvent disconnectedEvent) {
        DisconnectedEvent disconnectedEvent2 = disconnectedEvent == null ? new DisconnectedEvent(getConnectionId(), null) : disconnectedEvent;
        WebPubSubConnection webPubSubConnection = this.webPubSubConnection;
        if (webPubSubConnection != null) {
            webPubSubConnection.updateForDisconnected(() -> {
                tryEmitNext(this.disconnectedEventSink, disconnectedEvent2);
            });
        }
        if (disconnectedEvent == null) {
            this.webPubSubConnection = null;
        }
    }

    private void updateLogger(String str, String str2) {
        this.logger = new ClientLogger(WebPubSubAsyncClient.class, LoggingUtils.createContextWithConnectionId(str, str2));
        this.loggerReference.set(this.logger);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WebPubSubClientState getClientState() {
        return this.clientState.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WebSocketSession getWebsocketSession() {
        return this.webSocketSession;
    }

    private Sinks.EmitFailureHandler emitFailureHandler(String str) {
        return (signalType, emitResult) -> {
            LoggingUtils.addSignalTypeAndResult(this.logger.atWarning(), signalType, emitResult).log(str);
            return emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED);
        };
    }

    private RuntimeException logSendMessageFailedException(String str, Throwable th, boolean z, WebPubSubMessage webPubSubMessage) {
        return logSendMessageFailedException(str, th, z, webPubSubMessage instanceof WebPubSubMessageAck ? ((WebPubSubMessageAck) webPubSubMessage).getAckId() : null);
    }

    private RuntimeException logSendMessageFailedException(String str, Throwable th, boolean z, Long l) {
        return logSendMessageFailedException(str, th, z, l, null);
    }

    private RuntimeException logSendMessageFailedException(String str, Throwable th, boolean z, Long l, AckResponseError ackResponseError) {
        return this.logger.logExceptionAsWarning(new SendMessageFailedException(str, th, z, l, ackResponseError));
    }

    private static boolean isReliableProtocol(WebPubSubProtocolType webPubSubProtocolType) {
        return webPubSubProtocolType == WebPubSubProtocolType.JSON_RELIABLE_PROTOCOL;
    }
}
