package org.eclipse.ditto.client.messaging.internal;

import com.neovisionaries.ws.client.OpeningHandshakeException;
import com.neovisionaries.ws.client.StatusLine;
import com.neovisionaries.ws.client.WebSocket;
import com.neovisionaries.ws.client.WebSocketAdapter;
import com.neovisionaries.ws.client.WebSocketError;
import com.neovisionaries.ws.client.WebSocketException;
import com.neovisionaries.ws.client.WebSocketFactory;
import com.neovisionaries.ws.client.WebSocketFrame;
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.eclipse.ditto.client.configuration.AuthenticationConfiguration;
import org.eclipse.ditto.client.configuration.MessagingConfiguration;
import org.eclipse.ditto.client.internal.DefaultThreadFactory;
import org.eclipse.ditto.client.internal.VersionReader;
import org.eclipse.ditto.client.internal.bus.AdaptableBus;
import org.eclipse.ditto.client.internal.bus.BusFactory;
import org.eclipse.ditto.client.messaging.AuthenticationException;
import org.eclipse.ditto.client.messaging.AuthenticationProvider;
import org.eclipse.ditto.client.messaging.MessagingException;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.class */
public final class WebSocketMessagingProvider extends WebSocketAdapter implements MessagingProvider {
    private static final Duration ZOMBIE_LIFETIME = Duration.ofSeconds(3);
    private static final String DITTO_CLIENT_USER_AGENT = "DittoClient/" + VersionReader.determineClientVersion();
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketMessagingProvider.class);
    private static final int CONNECTION_TIMEOUT_MS = 5000;
    private static final int RECONNECTION_TIMEOUT_SECONDS = 5;
    private final AdaptableBus adaptableBus;
    private final MessagingConfiguration messagingConfiguration;
    private final AuthenticationProvider<WebSocket> authenticationProvider;
    private final ExecutorService callbackExecutor;
    private final String sessionId;
    private final AtomicBoolean reconnecting = new AtomicBoolean(false);
    private final AtomicBoolean initializing = new AtomicBoolean(false);
    private final CompletableFuture<WebSocket> initializationFuture = new CompletableFuture<>();
    private final ScheduledExecutorService connectExecutor = createConnectExecutor();
    private final Map<Object, String> subscriptionMessages = new ConcurrentHashMap();
    private final AtomicReference<WebSocket> webSocket = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.eclipse.ditto.client.messaging.internal.WebSocketMessagingProvider$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$ditto$model$base$common$HttpStatusCode = new int[HttpStatusCode.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$ditto$model$base$common$HttpStatusCode[HttpStatusCode.UNAUTHORIZED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$ditto$model$base$common$HttpStatusCode[HttpStatusCode.FORBIDDEN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    private WebSocketMessagingProvider(AdaptableBus adaptableBus, MessagingConfiguration messagingConfiguration, AuthenticationProvider<WebSocket> authenticationProvider, ExecutorService executorService) {
        this.adaptableBus = adaptableBus;
        this.messagingConfiguration = messagingConfiguration;
        this.authenticationProvider = authenticationProvider;
        this.callbackExecutor = executorService;
        this.sessionId = authenticationProvider.getConfiguration().getSessionId();
    }

    private static ScheduledExecutorService createConnectExecutor() {
        return Executors.newScheduledThreadPool(1, new DefaultThreadFactory("ditto-client-reconnect"));
    }

    public static WebSocketMessagingProvider newInstance(MessagingConfiguration messagingConfiguration, AuthenticationProvider<WebSocket> authenticationProvider, ExecutorService executorService) {
        ConditionChecker.checkNotNull(messagingConfiguration, "messagingConfiguration");
        ConditionChecker.checkNotNull(authenticationProvider, "authenticationProvider");
        ConditionChecker.checkNotNull(executorService, "callbackExecutor");
        return new WebSocketMessagingProvider(BusFactory.createAdaptableBus(), messagingConfiguration, authenticationProvider, executorService);
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public AuthenticationConfiguration getAuthenticationConfiguration() {
        return this.authenticationProvider.getConfiguration();
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public MessagingConfiguration getMessagingConfiguration() {
        return this.messagingConfiguration;
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public ExecutorService getExecutorService() {
        return this.callbackExecutor;
    }

    public ScheduledExecutorService getConnectExecutor() {
        return this.connectExecutor;
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public AdaptableBus getAdaptableBus() {
        return this.adaptableBus;
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public MessagingProvider registerSubscriptionMessage(Object obj, String str) {
        this.subscriptionMessages.put(obj, str);
        return this;
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public MessagingProvider unregisterSubscriptionMessage(Object obj) {
        this.subscriptionMessages.remove(obj);
        return this;
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public CompletionStage<?> initializeAsync() {
        return (this.initializing.getAndSet(true) || this.webSocket.get() != null) ? this.initializationFuture.thenApply(webSocket -> {
            return this;
        }) : connectWithPotentialRetries(this::createWebsocket, this.initializationFuture).thenApply(webSocket2 -> {
            setWebSocket(webSocket2);
            return this;
        });
    }

    private WebSocket createWebsocket() {
        WebSocketFactory newWebSocketFactory = WebSocketFactoryFactory.newWebSocketFactory(this.messagingConfiguration);
        try {
            return newWebSocketFactory.createSocket(this.messagingConfiguration.getEndpointUri()).addHeader(DittoHeaderDefinition.DECLARED_ACKS.getKey(), ((JsonArray) this.messagingConfiguration.getDeclaredAcknowledgements().stream().map((v0) -> {
                return v0.toString();
            }).map(JsonValue::of).collect(JsonCollectors.valuesToArray())).toString());
        } catch (IOException e) {
            throw MessagingException.connectFailed(this.sessionId, e);
        }
    }

    private CompletionStage<WebSocket> initiateConnection(WebSocket webSocket) {
        ConditionChecker.checkNotNull(webSocket, "ws");
        webSocket.addHeader("User-Agent", DITTO_CLIENT_USER_AGENT);
        webSocket.setMaxPayloadSize(262144);
        webSocket.setMissingCloseFrameAllowed(true);
        webSocket.setFrameQueueSize(0);
        webSocket.setPingInterval(5000L);
        this.authenticationProvider.prepareAuthentication(webSocket);
        webSocket.addListener(this);
        LOGGER.info("Connecting WebSocket on endpoint <{}>.", webSocket.getURI());
        Callable connectable = webSocket.connectable();
        return CompletableFuture.supplyAsync(() -> {
            try {
                return (WebSocket) connectable.call();
            } catch (Throwable th) {
                throw mapConnectError(th);
            }
        }, this.connectExecutor);
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public void emit(String str) {
        sendToWebsocket(str);
    }

    private void sendToWebsocket(String str) {
        WebSocket webSocket = this.webSocket.get();
        if (webSocket == null || !webSocket.isOpen()) {
            LOGGER.error("Client <{}>: WebSocket is not connected - going to discard message '{}'", this.sessionId, str);
        } else {
            LOGGER.debug("Client <{}>: Sending: {}", this.sessionId, str);
            webSocket.sendText(str);
        }
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public void close() {
        try {
            LOGGER.debug("Client <{}>: Closing WebSocket client of endpoint <{}>.", this.sessionId, this.messagingConfiguration.getEndpointUri());
            this.connectExecutor.shutdownNow();
            this.authenticationProvider.destroy();
            this.adaptableBus.shutdownExecutor();
            WebSocket webSocket = this.webSocket.get();
            if (webSocket != null) {
                webSocket.disconnect();
            }
            LOGGER.info("Client <{}>: WebSocket destroyed.", this.sessionId);
            this.initializationFuture.completeExceptionally(MessagingException.connectFailed(this.sessionId, new IllegalStateException("The client was destroyed.")));
        } catch (Exception e) {
            LOGGER.info("Client <{}>: Exception occurred while trying to shutdown http client.", this.sessionId, e);
            this.initializationFuture.completeExceptionally(MessagingException.connectFailed(this.sessionId, e));
        }
    }

    public void onConnected(WebSocket webSocket, Map<String, List<String>> map) {
        this.callbackExecutor.execute(() -> {
            LOGGER.info("Client <{}>: WebSocket connection is established", this.sessionId);
            if (this.subscriptionMessages.isEmpty()) {
                return;
            }
            LOGGER.info("Client <{}>: Subscribing again for messages from backend after reconnection", this.sessionId);
            this.subscriptionMessages.values().forEach(this::emit);
        });
    }

    public void onDisconnected(WebSocket webSocket, WebSocketFrame webSocketFrame, WebSocketFrame webSocketFrame2, boolean z) {
        this.callbackExecutor.execute(() -> {
            if (!z) {
                LOGGER.info("Client <{}>: WebSocket connection to endpoint <{}> was closed by client", this.sessionId, this.messagingConfiguration.getEndpointUri());
            } else {
                LOGGER.info("Client <{}>: WebSocket connection to endpoint <{}> was closed by Server with code <{}> and reason <{}>.", new Object[]{this.sessionId, this.messagingConfiguration.getEndpointUri(), Integer.valueOf(webSocketFrame.getCloseCode()), webSocketFrame.getCloseReason()});
                handleReconnectionIfEnabled();
            }
        });
    }

    public void onError(WebSocket webSocket, WebSocketException webSocketException) {
        this.callbackExecutor.execute(() -> {
            LOGGER.error("Client <{}>: Error in WebSocket: {}", this.sessionId, null != webSocketException ? webSocketException.getClass().getSimpleName() + ": " + webSocketException.getMessage() : "-");
            handleReconnectionIfEnabled();
        });
    }

    private CompletableFuture<WebSocket> connectWithPotentialRetries(Supplier<WebSocket> supplier, CompletableFuture<WebSocket> completableFuture) {
        try {
            return Retry.retryTo("initialize WebSocket connection", () -> {
                return initiateConnection((WebSocket) supplier.get());
            }).inClientSession(this.sessionId).withExecutors(this.connectExecutor, this.callbackExecutor).notifyOnError(this.messagingConfiguration.getConnectionErrorHandler().orElse(null)).isRecoverable(this.messagingConfiguration.isInitialConnectRetryEnabled() ? WebSocketMessagingProvider::isRecoverable : th -> {
                return false;
            }).completeFutureEventually(completableFuture);
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
            return completableFuture;
        }
    }

    private void handleReconnectionIfEnabled() {
        if (!this.messagingConfiguration.isReconnectEnabled()) {
            LOGGER.info("Client <{}>: Reconnection is NOT enabled. Closing client ...", this.sessionId);
            this.adaptableBus.getScheduledExecutor().schedule(this::close, ZOMBIE_LIFETIME.toMillis(), TimeUnit.MILLISECONDS);
        } else if (this.reconnecting.compareAndSet(false, true)) {
            LOGGER.info("Client <{}>: Reconnection is enabled. Reconnecting in <{}> seconds ...", this.sessionId, Integer.valueOf(RECONNECTION_TIMEOUT_SECONDS));
            this.connectExecutor.schedule(this::reconnectWithRetries, 5L, TimeUnit.SECONDS);
        }
    }

    private void reconnectWithRetries() {
        connectWithPotentialRetries(this::recreateWebSocket, new CompletableFuture<>()).thenAccept(webSocket -> {
            setWebSocket(webSocket);
            this.reconnecting.set(false);
        });
    }

    private void setWebSocket(WebSocket webSocket) {
        synchronized (this.webSocket) {
            WebSocket webSocket2 = this.webSocket.get();
            this.webSocket.set(webSocket);
            if (webSocket2 != null && webSocket2 != webSocket) {
                try {
                    webSocket2.disconnect();
                } catch (Exception e) {
                    LOGGER.error("Client <{}>: Error disconnecting a previous websocket", this.sessionId, e);
                }
            }
        }
    }

    private WebSocket recreateWebSocket() {
        LOGGER.info("Recreating Websocket..");
        WebSocket webSocket = this.webSocket.get();
        if (webSocket == null) {
            LOGGER.error("Client <{}>: attempt to recreate a null websocket", this.sessionId);
            throw new IllegalStateException("Cannot recreate a null websocket. This method should not have been called without having created a WebSocket before.");
        }
        webSocket.clearHeaders();
        webSocket.clearListeners();
        try {
            return webSocket.recreate(CONNECTION_TIMEOUT_MS).addHeader(DittoHeaderDefinition.DECLARED_ACKS.getKey(), ((JsonArray) this.messagingConfiguration.getDeclaredAcknowledgements().stream().map((v0) -> {
                return v0.toString();
            }).map(JsonValue::of).collect(JsonCollectors.valuesToArray())).toString());
        } catch (IOException e) {
            throw MessagingException.recreateFailed(this.sessionId, e);
        }
    }

    public void onBinaryMessage(WebSocket webSocket, byte[] bArr) {
        LOGGER.debug("Client <{}>: Received WebSocket byte array message <{}>, as string <{}> - don't know what to do with it!.", new Object[]{this.sessionId, bArr, new String(bArr, StandardCharsets.UTF_8)});
    }

    public void onTextMessage(WebSocket webSocket, String str) {
        LOGGER.debug("Client <{}>: Received WebSocket string message <{}>", this.sessionId, str);
        handleIncomingMessage(str);
    }

    private void handleIncomingMessage(String str) {
        this.adaptableBus.publish(str);
    }

    private RuntimeException mapConnectError(Throwable th) {
        OpeningHandshakeException rootCause = getRootCause(th);
        if (rootCause instanceof WebSocketException) {
            LOGGER.error("Got exception: {}", rootCause.getMessage());
            if (rootCause instanceof OpeningHandshakeException) {
                StatusLine statusLine = rootCause.getStatusLine();
                HttpStatusCode httpStatusCode = (HttpStatusCode) HttpStatusCode.forInt(statusLine.getStatusCode()).orElse(HttpStatusCode.INTERNAL_SERVER_ERROR);
                if (httpStatusCode.isClientError()) {
                    switch (AnonymousClass1.$SwitchMap$org$eclipse$ditto$model$base$common$HttpStatusCode[httpStatusCode.ordinal()]) {
                        case 1:
                            return AuthenticationException.unauthorized(this.sessionId, rootCause);
                        case 2:
                            return AuthenticationException.forbidden(this.sessionId, rootCause);
                        default:
                            return AuthenticationException.withStatus(this.sessionId, rootCause, statusLine.getStatusCode(), statusLine.getReasonPhrase() + ": " + new String(rootCause.getBody()));
                    }
                }
            } else if (((WebSocketException) rootCause).getError() == WebSocketError.SOCKET_CONNECT_ERROR && (rootCause.getCause() instanceof UnknownHostException)) {
                return MessagingException.connectFailed(this.sessionId, rootCause.getCause());
            }
        }
        return MessagingException.connectFailed(this.sessionId, rootCause);
    }

    private static boolean isRecoverable(Throwable th) {
        return true;
    }

    private static Throwable getRootCause(Throwable th) {
        return th.getCause() == null ? th : ((th instanceof CompletionException) || (th instanceof ExecutionException)) ? getRootCause(th.getCause()) : th;
    }
}
