package org.eclipse.ditto.client.messaging.internal;

import com.neovisionaries.ws.client.OpeningHandshakeException;
import com.neovisionaries.ws.client.StatusLine;
import com.neovisionaries.ws.client.WebSocket;
import com.neovisionaries.ws.client.WebSocketAdapter;
import com.neovisionaries.ws.client.WebSocketException;
import com.neovisionaries.ws.client.WebSocketFrame;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.ditto.client.configuration.AuthenticationConfiguration;
import org.eclipse.ditto.client.configuration.MessagingConfiguration;
import org.eclipse.ditto.client.internal.DefaultThreadFactory;
import org.eclipse.ditto.client.internal.VersionReader;
import org.eclipse.ditto.client.live.internal.LiveImpl;
import org.eclipse.ditto.client.messaging.AuthenticationException;
import org.eclipse.ditto.client.messaging.AuthenticationProvider;
import org.eclipse.ditto.client.messaging.MessagingException;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.client.twin.internal.TwinImpl;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonMissingFieldException;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonParseException;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonRuntimeException;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.DittoHeadersBuilder;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.model.base.json.Jsonifiable;
import org.eclipse.ditto.model.messages.Message;
import org.eclipse.ditto.model.messages.MessageDirection;
import org.eclipse.ditto.model.messages.MessageHeaders;
import org.eclipse.ditto.model.messages.MessageResponseConsumer;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.protocoladapter.JsonifiableAdaptable;
import org.eclipse.ditto.protocoladapter.ProtocolFactory;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.protocoladapter.UnknownCommandException;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
import org.eclipse.ditto.signals.commands.live.LiveCommandFactory;
import org.eclipse.ditto.signals.commands.live.base.LiveCommand;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.messages.MessageCommandResponse;
import org.eclipse.ditto.signals.commands.messages.SendFeatureMessage;
import org.eclipse.ditto.signals.commands.messages.SendFeatureMessageResponse;
import org.eclipse.ditto.signals.commands.messages.SendThingMessage;
import org.eclipse.ditto.signals.commands.messages.SendThingMessageResponse;
import org.eclipse.ditto.signals.commands.things.ThingCommand;
import org.eclipse.ditto.signals.commands.things.ThingCommandResponse;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.things.ThingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.class */
public final class WebSocketMessagingProvider extends WebSocketAdapter implements MessagingProvider {
    private static final int RECONNECTION_TIMEOUT_SECONDS = 5;
    private static final String PROTOCOL_CMD_START_SEND_EVENTS = "START-SEND-EVENTS";
    private static final String PROTOCOL_CMD_STOP_SEND_EVENTS = "STOP-SEND-EVENTS";
    private static final String PROTOCOL_CMD_START_SEND_MESSAGES = "START-SEND-MESSAGES";
    private static final String PROTOCOL_CMD_STOP_SEND_MESSAGES = "STOP-SEND-MESSAGES";
    private static final String PROTOCOL_CMD_START_SEND_LIVE_COMMANDS = "START-SEND-LIVE-COMMANDS";
    private static final String PROTOCOL_CMD_STOP_SEND_LIVE_COMMANDS = "STOP-SEND-LIVE-COMMANDS";
    private static final String PROTOCOL_CMD_START_SEND_LIVE_EVENTS = "START-SEND-LIVE-EVENTS";
    private static final String PROTOCOL_CMD_STOP_SEND_LIVE_EVENTS = "STOP-SEND-LIVE-EVENTS";
    private static final String PROTOCOL_CMD_JWT_TOKEN = "JWT-TOKEN";
    private static final String PROTOCOL_CMD_ACK_SUFFIX = ":ACK";
    private static final int MAX_OUTSTANDING_MESSAGE_RESPONSES = 250;
    private static final int CONNECTION_TIMEOUT_MS = 5000;
    private final MessagingConfiguration messagingConfiguration;
    private final AuthenticationProvider<WebSocket> authenticationProvider;
    private final ExecutorService callbackExecutor;
    private final String sessionId;
    private final ScheduledExecutorService reconnectExecutor;
    private final DittoProtocolAdapter protocolAdapter;
    private final Map<String, MessageResponseConsumer<?>> messageCommandResponseConsumers;
    private final Map<String, Map<String, String>> registrationConfigs;
    private final Map<String, CompletableFuture<Adaptable>> customAdaptableResponseFutures;
    private Consumer<ThingCommandResponse> commandResponseConsumer;
    private WebSocket webSocket;
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketMessagingProvider.class);
    private static final String DITTO_CLIENT_USER_AGENT = "DittoClient/" + VersionReader.determineClientVersion();
    private final AtomicBoolean reconnecting = new AtomicBoolean(false);
    private final AtomicBoolean initiallyConnected = new AtomicBoolean(false);
    private boolean sendMeTwinEvents = false;
    private boolean sendMeLiveMessages = false;
    private boolean sendMeLiveCommands = false;
    private boolean sendMeLiveEvents = false;
    private final Map<String, CompletableFuture<Void>> subscriptionsAcks = new ConcurrentHashMap();
    private final Map<String, Consumer<Message<?>>> subscriptions = new ConcurrentHashMap();

    /* loaded from: input_file:org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider$LimitedHashMap.class */
    private static class LimitedHashMap<K, V> extends LinkedHashMap<K, V> {
        private static final long serialVersionUID = -2771080576933386538L;
        private final int maxSize;

        private LimitedHashMap(int i) {
            this.maxSize = i;
        }

        @Override // java.util.LinkedHashMap
        protected boolean removeEldestEntry(Map.Entry<K, V> entry) {
            return size() > this.maxSize;
        }

        @Override // java.util.AbstractMap, java.util.Map
        public boolean equals(@Nullable Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && super.equals(obj) && this.maxSize == ((LimitedHashMap) obj).maxSize;
        }

        @Override // java.util.AbstractMap, java.util.Map
        public int hashCode() {
            return Objects.hash(Integer.valueOf(super.hashCode()), Integer.valueOf(this.maxSize));
        }

        @Override // java.util.AbstractMap
        public String toString() {
            return getClass().getSimpleName() + " [" + super.toString() + ", maxSize=" + this.maxSize + "]";
        }
    }

    private WebSocketMessagingProvider(MessagingConfiguration messagingConfiguration, AuthenticationProvider<WebSocket> authenticationProvider, ExecutorService executorService) {
        this.messagingConfiguration = messagingConfiguration;
        this.authenticationProvider = authenticationProvider;
        this.callbackExecutor = executorService;
        this.sessionId = authenticationProvider.getConfiguration().getSessionId();
        this.reconnectExecutor = messagingConfiguration.isReconnectEnabled() ? createScheduledThreadPoolExecutor() : null;
        this.protocolAdapter = DittoProtocolAdapter.of(HeaderTranslator.empty());
        this.messageCommandResponseConsumers = new LimitedHashMap(MAX_OUTSTANDING_MESSAGE_RESPONSES);
        this.registrationConfigs = new HashMap();
        this.customAdaptableResponseFutures = new HashMap();
    }

    private static ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor() {
        return new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory("ditto-client-reconnect"));
    }

    public static WebSocketMessagingProvider newInstance(MessagingConfiguration messagingConfiguration, AuthenticationProvider<WebSocket> authenticationProvider, ExecutorService executorService) {
        ConditionChecker.checkNotNull(messagingConfiguration, "messagingConfiguration");
        ConditionChecker.checkNotNull(authenticationProvider, "authenticationProvider");
        ConditionChecker.checkNotNull(executorService, "callbackExecutor");
        return new WebSocketMessagingProvider(messagingConfiguration, authenticationProvider, executorService);
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public AuthenticationConfiguration getAuthenticationConfiguration() {
        return this.authenticationProvider.getConfiguration();
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public MessagingConfiguration getMessagingConfiguration() {
        return this.messagingConfiguration;
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public ExecutorService getExecutorService() {
        return this.callbackExecutor;
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public void initialize() {
        if (this.webSocket == null || !this.webSocket.isOpen()) {
            safeGet(initiateConnection(createWebsocket()));
        }
    }

    private WebSocket createWebsocket() {
        try {
            WebSocket createSocket = WebSocketFactoryFactory.newWebSocketFactory(this.messagingConfiguration).createSocket(this.messagingConfiguration.getEndpointUri());
            createSocket.addHeader("User-Agent", DITTO_CLIENT_USER_AGENT);
            createSocket.setMaxPayloadSize(262144);
            createSocket.setMissingCloseFrameAllowed(true);
            createSocket.setFrameQueueSize(1);
            return createSocket;
        } catch (IOException e) {
            throw MessagingException.connectFailed(this.sessionId, e);
        }
    }

    private <T> T safeGet(Future<T> future) {
        try {
            return future.get(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            return (T) handleInterruptedException(e);
        } catch (ExecutionException e2) {
            return (T) handleExecutionException(e2);
        } catch (TimeoutException e3) {
            return (T) handleTimeoutException(e3);
        }
    }

    private <T> T handleInterruptedException(InterruptedException interruptedException) {
        Thread.currentThread().interrupt();
        throw MessagingException.connectInterrupted(this.sessionId, interruptedException);
    }

    private <T> T handleTimeoutException(TimeoutException timeoutException) {
        throw MessagingException.connectTimeout(this.sessionId, timeoutException);
    }

    private <T> T handleExecutionException(ExecutionException executionException) {
        WebSocketException cause = executionException.getCause();
        if (cause instanceof WebSocketException) {
            LOGGER.error("Got exception: {}", cause.getMessage());
            if (isAuthenticationException(cause)) {
                throw AuthenticationException.unauthorized(this.sessionId, cause);
            }
            if (isForbidden(cause)) {
                throw AuthenticationException.forbidden(this.sessionId, cause);
            }
        }
        throw AuthenticationException.of(this.sessionId, executionException);
    }

    private static boolean isAuthenticationException(WebSocketException webSocketException) {
        if (!(webSocketException instanceof OpeningHandshakeException)) {
            return false;
        }
        StatusLine statusLine = ((OpeningHandshakeException) webSocketException).getStatusLine();
        return statusLine.getStatusCode() == 401 && statusLine.getReasonPhrase().contains("nauthorized");
    }

    private static boolean isForbidden(WebSocketException webSocketException) {
        if (!(webSocketException instanceof OpeningHandshakeException)) {
            return false;
        }
        StatusLine statusLine = ((OpeningHandshakeException) webSocketException).getStatusLine();
        return statusLine.getStatusCode() == 403 && statusLine.getReasonPhrase().contains("orbidden");
    }

    private CompletableFuture<WebSocket> initiateConnection(WebSocket webSocket) {
        ConditionChecker.checkNotNull(webSocket, "ws");
        this.authenticationProvider.prepareAuthentication(webSocket);
        webSocket.addListener(this);
        LOGGER.info("Connecting WebSocket on endpoint <{}>", webSocket.getURI());
        ExecutorService createConnectionExecutor = createConnectionExecutor();
        return CompletableFuture.supplyAsync(() -> {
            try {
                WebSocket webSocket2 = (WebSocket) safeGet(webSocket.connect(createConnectionExecutor));
                createConnectionExecutor.shutdown();
                return webSocket2;
            } catch (Throwable th) {
                createConnectionExecutor.shutdown();
                throw th;
            }
        }, createConnectionExecutor);
    }

    private static ExecutorService createConnectionExecutor() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new DefaultThreadFactory("ditto-client-connect"), new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public void send(Message<?> message, TopicPath.Channel channel) {
        Adaptable tryToConvertToAdaptable;
        DittoHeadersBuilder newBuilder = DittoHeaders.newBuilder();
        Optional correlationId = message.getCorrelationId();
        newBuilder.getClass();
        correlationId.ifPresent((v1) -> {
            r1.correlationId(v1);
        });
        DittoHeaders build = newBuilder.build();
        ThingId thingEntityId = message.getThingEntityId();
        Optional statusCode = message.getStatusCode();
        Optional featureId = message.getFeatureId();
        if (statusCode.isPresent()) {
            HttpStatusCode httpStatusCode = (HttpStatusCode) statusCode.get();
            tryToConvertToAdaptable = tryToConvertToAdaptable((MessageCommandResponse<?, ?>) (featureId.isPresent() ? SendFeatureMessageResponse.of(thingEntityId, (String) featureId.get(), message, httpStatusCode, build) : SendThingMessageResponse.of(thingEntityId, message, httpStatusCode, build)));
        } else {
            tryToConvertToAdaptable = tryToConvertToAdaptable((MessageCommand<?, ?>) (featureId.isPresent() ? SendFeatureMessage.of(thingEntityId, (String) featureId.get(), message, build) : SendThingMessage.of(thingEntityId, message, build)));
            Optional responseConsumer = message.getResponseConsumer();
            if (correlationId.isPresent() && responseConsumer.isPresent()) {
                this.messageCommandResponseConsumers.put(correlationId.get(), responseConsumer.get());
            }
        }
        doSendAdaptable(tryToConvertToAdaptable);
    }

    @Nullable
    private Adaptable tryToConvertToAdaptable(MessageCommandResponse<?, ?> messageCommandResponse) {
        try {
            return this.protocolAdapter.toAdaptable(messageCommandResponse);
        } catch (UnknownCommandException e) {
            logUnknownType(messageCommandResponse, e);
            return null;
        }
    }

    private <T> void logUnknownType(T t, Throwable th) {
        LOGGER.error("Client <{}>: Unknown {} type: <{}> - NOT sending via Ditto WebSocket!", new Object[]{this.sessionId, t.getClass().getSimpleName(), th.getMessage()});
    }

    @Nullable
    private Adaptable tryToConvertToAdaptable(MessageCommand<?, ?> messageCommand) {
        try {
            return this.protocolAdapter.toAdaptable(messageCommand);
        } catch (UnknownCommandException e) {
            logUnknownType(messageCommand, e);
            return null;
        }
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public void sendCommand(Command<?> command, TopicPath.Channel channel) {
        doSendAdaptable(tryToConvertToAdaptable(command, channel));
    }

    @Nullable
    private Adaptable tryToConvertToAdaptable(Command<?> command, TopicPath.Channel channel) {
        try {
            return this.protocolAdapter.toAdaptable(command.setDittoHeaders(adjustHeadersForLive(command)), channel);
        } catch (UnknownCommandException e) {
            logUnknownType(command, e);
            return null;
        }
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public void sendCommandResponse(CommandResponse<?> commandResponse, TopicPath.Channel channel) {
        doSendAdaptable(tryToConvertToAdaptable(commandResponse, channel));
    }

    @Nullable
    private Adaptable tryToConvertToAdaptable(CommandResponse<?> commandResponse, TopicPath.Channel channel) {
        try {
            return this.protocolAdapter.toAdaptable(commandResponse.setDittoHeaders(adjustHeadersForLive(commandResponse)), channel);
        } catch (UnknownCommandException e) {
            logUnknownType(commandResponse, e);
            return null;
        }
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public void emitEvent(Event<?> event, TopicPath.Channel channel) {
        doSendAdaptable(tryToConvertToAdaptable(event, channel));
    }

    @Nullable
    private Adaptable tryToConvertToAdaptable(Event<?> event, TopicPath.Channel channel) {
        try {
            return this.protocolAdapter.toAdaptable(event.setDittoHeaders(adjustHeadersForLive(event)), channel);
        } catch (UnknownCommandException e) {
            logUnknownType(event, e);
            return null;
        }
    }

    private DittoHeaders adjustHeadersForLive(WithDittoHeaders<?> withDittoHeaders) {
        return withDittoHeaders.getDittoHeaders().toBuilder().removeHeader(DittoHeaderDefinition.READ_SUBJECTS.getKey()).removeHeader(DittoHeaderDefinition.AUTHORIZATION_SUBJECTS.getKey()).removeHeader(DittoHeaderDefinition.RESPONSE_REQUIRED.getKey()).removeHeader(DittoHeaderDefinition.SOURCE.getKey()).source(this.sessionId).build();
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public CompletableFuture<Adaptable> sendAdaptable(Adaptable adaptable) {
        DittoHeaders dittoHeaders = (DittoHeaders) adaptable.getHeaders().orElseGet(DittoHeaders::empty);
        Adaptable adaptable2 = adaptable;
        if (!dittoHeaders.getCorrelationId().isPresent()) {
            dittoHeaders = dittoHeaders.toBuilder().correlationId(UUID.randomUUID().toString()).build();
            adaptable2 = (Adaptable) adaptable.setDittoHeaders(dittoHeaders);
        }
        String charSequence = getCorrelationIdOrThrow(dittoHeaders).toString();
        doSendAdaptable(adaptable2);
        CompletableFuture<Adaptable> completableFuture = new CompletableFuture<>();
        this.customAdaptableResponseFutures.put(charSequence, completableFuture);
        return completableFuture;
    }

    private void doSendAdaptable(@Nullable Adaptable adaptable) {
        if (null == adaptable) {
            return;
        }
        if (this.webSocket == null || !this.webSocket.isOpen()) {
            LOGGER.error("Client <{}>: WebSocket is not connected - going to discard Adaptable '{}'", this.sessionId, adaptable);
            return;
        }
        String jsonString = ProtocolFactory.wrapAsJsonifiableAdaptable(adaptable).toJsonString();
        LOGGER.debug("Client <{}>: Sending JSON: {}", this.sessionId, jsonString);
        this.webSocket.sendText(jsonString);
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public void registerReplyHandler(Consumer<ThingCommandResponse> consumer) {
        this.commandResponseConsumer = consumer;
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public boolean registerMessageHandler(String str, Map<String, String> map, Consumer<Message<?>> consumer, CompletableFuture<Void> completableFuture) {
        if (this.subscriptions.containsKey(str)) {
            LOGGER.info("Client <{}>: Handler {} already registered for client", this.sessionId, str);
            completableFuture.complete(null);
            return false;
        }
        LOGGER.trace("Client <{}>: Registering incoming message handler'", this.sessionId);
        this.subscriptions.put(str, consumer);
        this.registrationConfigs.put(str, map);
        if (this.webSocket == null) {
            return true;
        }
        if (TwinImpl.CONSUME_TWIN_EVENTS_HANDLER.equals(str)) {
            this.sendMeTwinEvents = true;
            askBackend(PROTOCOL_CMD_START_SEND_EVENTS, map, completableFuture);
            return true;
        }
        if (LiveImpl.CONSUME_LIVE_MESSAGES_HANDLER.equals(str)) {
            this.sendMeLiveMessages = true;
            askBackend(PROTOCOL_CMD_START_SEND_MESSAGES, map, completableFuture);
            return true;
        }
        if (LiveImpl.CONSUME_LIVE_COMMANDS_HANDLER.equals(str)) {
            this.sendMeLiveCommands = true;
            askBackend(PROTOCOL_CMD_START_SEND_LIVE_COMMANDS, map, completableFuture);
            return true;
        }
        if (!LiveImpl.CONSUME_LIVE_EVENTS_HANDLER.equals(str)) {
            return true;
        }
        this.sendMeLiveEvents = true;
        askBackend(PROTOCOL_CMD_START_SEND_LIVE_EVENTS, map, completableFuture);
        return true;
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public synchronized void deregisterMessageHandler(String str, CompletableFuture<Void> completableFuture) {
        this.subscriptions.remove(str);
        if (TwinImpl.CONSUME_TWIN_EVENTS_HANDLER.equals(str)) {
            this.sendMeTwinEvents = false;
            askBackend(PROTOCOL_CMD_STOP_SEND_EVENTS, Collections.emptyMap(), completableFuture);
            return;
        }
        if (LiveImpl.CONSUME_LIVE_MESSAGES_HANDLER.equals(str)) {
            this.sendMeLiveMessages = false;
            askBackend(PROTOCOL_CMD_STOP_SEND_MESSAGES, Collections.emptyMap(), completableFuture);
        } else if (LiveImpl.CONSUME_LIVE_COMMANDS_HANDLER.equals(str)) {
            this.sendMeLiveCommands = false;
            askBackend(PROTOCOL_CMD_STOP_SEND_LIVE_COMMANDS, Collections.emptyMap(), completableFuture);
        } else if (LiveImpl.CONSUME_LIVE_EVENTS_HANDLER.equals(str)) {
            this.sendMeLiveEvents = false;
            askBackend(PROTOCOL_CMD_STOP_SEND_LIVE_EVENTS, Collections.emptyMap(), completableFuture);
        }
    }

    @Override // org.eclipse.ditto.client.messaging.MessagingProvider
    public void close() {
        try {
            LOGGER.debug("Client <{}>: Closing WebSocket client of endpoint <{}>.", this.sessionId, this.messagingConfiguration.getEndpointUri());
            if (null != this.reconnectExecutor) {
                this.reconnectExecutor.shutdownNow();
            }
            this.authenticationProvider.destroy();
            this.webSocket.disconnect();
            LOGGER.debug("Client <{}>: WebSocket destroyed.", this.sessionId);
        } catch (Exception e) {
            LOGGER.info("Client <{}>: Exception occurred while trying to shutdown http client.", this.sessionId, e);
        }
    }

    public void onConnected(WebSocket webSocket, Map<String, List<String>> map) {
        this.webSocket = webSocket;
        this.callbackExecutor.execute(() -> {
            LOGGER.info("Client <{}>: WebSocket connection is established", this.sessionId);
            if (this.initiallyConnected.get()) {
                LOGGER.info("Client <{}>: Subscribing again for messages from backend after reconnection", this.sessionId);
                CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                if (this.sendMeTwinEvents) {
                    askBackend(PROTOCOL_CMD_START_SEND_EVENTS, this.registrationConfigs.get(TwinImpl.CONSUME_TWIN_EVENTS_HANDLER), completableFuture);
                }
                if (this.sendMeLiveMessages) {
                    askBackend(PROTOCOL_CMD_START_SEND_MESSAGES, this.registrationConfigs.get(LiveImpl.CONSUME_LIVE_MESSAGES_HANDLER), completableFuture);
                }
                if (this.sendMeLiveCommands) {
                    askBackend(PROTOCOL_CMD_START_SEND_LIVE_COMMANDS, this.registrationConfigs.get(LiveImpl.CONSUME_LIVE_COMMANDS_HANDLER), completableFuture);
                }
                if (this.sendMeLiveEvents) {
                    askBackend(PROTOCOL_CMD_START_SEND_LIVE_EVENTS, this.registrationConfigs.get(LiveImpl.CONSUME_LIVE_EVENTS_HANDLER), completableFuture);
                }
            }
            this.initiallyConnected.set(true);
        });
    }

    private void askBackend(String str, Map<String, String> map, CompletableFuture<Void> completableFuture) {
        LOGGER.info("Client <{}>: Requesting at backend that this client wants to <{}> with params <{}>", new Object[]{this.sessionId, str, map});
        if (this.webSocket != null) {
            String str2 = (String) map.entrySet().stream().map(entry -> {
                return urlEncode((String) entry.getKey()) + "=" + urlEncode((String) entry.getValue());
            }).collect(Collectors.joining("&"));
            String str3 = str2.isEmpty() ? str : str + "?" + str2;
            LOGGER.debug("Sending: {}", str3);
            this.webSocket.sendText(str3);
        }
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        completableFuture2.thenAcceptAsync(r8 -> {
            LOGGER.debug("Client <{}>: Backend now <{}>.", this.sessionId, str);
            completableFuture.complete(r8);
        }, (Executor) this.callbackExecutor);
        this.subscriptionsAcks.put(str, completableFuture2);
    }

    private static String urlEncode(String str) {
        try {
            return URLEncoder.encode(str, StandardCharsets.UTF_8.name());
        } catch (UnsupportedEncodingException e) {
            throw new IllegalStateException("Missing standard charset UTF 8 for encoding.", e);
        }
    }

    public void onDisconnected(WebSocket webSocket, WebSocketFrame webSocketFrame, WebSocketFrame webSocketFrame2, boolean z) {
        this.callbackExecutor.execute(() -> {
            if (!z) {
                LOGGER.info("Client <{}>: WebSocket connection to endpoint <{}> was closed by client", this.sessionId, this.messagingConfiguration.getEndpointUri());
            } else {
                LOGGER.info("Client <{}>: WebSocket connection to endpoint <{}> was closed by Server with code <{}> and reason <{}>.", new Object[]{this.sessionId, this.messagingConfiguration.getEndpointUri(), Integer.valueOf(webSocketFrame.getCloseCode()), webSocketFrame.getCloseReason()});
                handleReconnectionIfEnabled();
            }
        });
    }

    public void onError(WebSocket webSocket, WebSocketException webSocketException) {
        this.callbackExecutor.execute(() -> {
            LOGGER.error("Client <{}>: Error in WebSocket: {}", this.sessionId, null != webSocketException ? webSocketException.getClass().getSimpleName() + ": " + webSocketException.getMessage() : "-");
            handleReconnectionIfEnabled();
        });
    }

    private void handleReconnectionIfEnabled() {
        if (!this.messagingConfiguration.isReconnectEnabled()) {
            LOGGER.info("Client <{}>: Reconnection is NOT enabled. Closing client ...", this.sessionId);
            close();
        } else if (this.initiallyConnected.get() && this.reconnecting.compareAndSet(false, true) && null != this.reconnectExecutor) {
            LOGGER.info("Client <{}>: Reconnection is enabled. Reconnecting in <{}> seconds ...", this.sessionId, Integer.valueOf(RECONNECTION_TIMEOUT_SECONDS));
            this.reconnectExecutor.schedule(this::initWebSocketConnectionWithReconnect, 5L, TimeUnit.SECONDS);
        }
    }

    private void initWebSocketConnectionWithReconnect() {
        int i = 0;
        WebSocket webSocket = null;
        while (null == webSocket) {
            if (0 < i) {
                waitUntilNextReconnectionAttempt();
            }
            i++;
            webSocket = tryToReconnect(i);
        }
        this.reconnecting.set(false);
    }

    private void waitUntilNextReconnectionAttempt() {
        try {
            LOGGER.info("Client <{}>: Retrying connection initiation again in <{}> seconds ...", this.sessionId, Integer.valueOf(RECONNECTION_TIMEOUT_SECONDS));
            TimeUnit.SECONDS.sleep(5L);
        } catch (InterruptedException e) {
            LOGGER.error("Client <{}>: Interrupted while waiting for reconnection.", this.sessionId, e);
            Thread.currentThread().interrupt();
        }
    }

    @Nullable
    private WebSocket tryToReconnect(int i) {
        try {
            LOGGER.info("Recreating Websocket..");
            this.webSocket.clearHeaders();
            this.webSocket.clearListeners();
            return (WebSocket) safeGet(initiateConnection(this.webSocket.recreate()));
        } catch (IOException | AuthenticationException e) {
            LOGGER.error("Client <{}>: Failed to establish connection ({}): {}", new Object[]{this.sessionId, Integer.valueOf(i), e.getMessage()});
            return null;
        }
    }

    public void onBinaryMessage(WebSocket webSocket, byte[] bArr) {
        this.callbackExecutor.execute(() -> {
            LOGGER.debug("Client <{}>: Received WebSocket byte array message <{}>, as string <{}> - don't know what to do with it!.", new Object[]{this.sessionId, bArr, new String(bArr, StandardCharsets.UTF_8)});
        });
    }

    public void onTextMessage(WebSocket webSocket, String str) {
        this.callbackExecutor.execute(() -> {
            LOGGER.trace("Client <{}>: Received WebSocket string message <{}>", this.sessionId, str);
            handleIncomingMessage(str);
        });
    }

    private void handleIncomingMessage(String str) {
        Adaptable tryToGetJsonifiableAdaptableFromMessageJson;
        Signal<?> tryToAdaptToSignal;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1944772011:
                if (str.equals("STOP-SEND-MESSAGES:ACK")) {
                    z = 3;
                    break;
                }
                break;
            case -1561451934:
                if (str.equals("START-SEND-EVENTS:ACK")) {
                    z = false;
                    break;
                }
                break;
            case -841879842:
                if (str.equals("STOP-SEND-LIVE-COMMANDS:ACK")) {
                    z = RECONNECTION_TIMEOUT_SECONDS;
                    break;
                }
                break;
            case -1997137:
                if (str.equals("STOP-SEND-LIVE-EVENTS:ACK")) {
                    z = 7;
                    break;
                }
                break;
            case 163325346:
                if (str.equals("JWT-TOKEN:ACK")) {
                    z = 8;
                    break;
                }
                break;
            case 518282901:
                if (str.equals("START-SEND-MESSAGES:ACK")) {
                    z = 2;
                    break;
                }
                break;
            case 1492964898:
                if (str.equals("STOP-SEND-EVENTS:ACK")) {
                    z = true;
                    break;
                }
                break;
            case 1645601391:
                if (str.equals("START-SEND-LIVE-EVENTS:ACK")) {
                    z = 6;
                    break;
                }
                break;
            case 1952340638:
                if (str.equals("START-SEND-LIVE-COMMANDS:ACK")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                ackSubscription(PROTOCOL_CMD_START_SEND_EVENTS);
                return;
            case true:
                ackSubscription(PROTOCOL_CMD_STOP_SEND_EVENTS);
                return;
            case true:
                ackSubscription(PROTOCOL_CMD_START_SEND_MESSAGES);
                return;
            case true:
                ackSubscription(PROTOCOL_CMD_STOP_SEND_MESSAGES);
                return;
            case true:
                ackSubscription(PROTOCOL_CMD_START_SEND_LIVE_COMMANDS);
                return;
            case RECONNECTION_TIMEOUT_SECONDS /* 5 */:
                ackSubscription(PROTOCOL_CMD_STOP_SEND_LIVE_COMMANDS);
                return;
            case true:
                ackSubscription(PROTOCOL_CMD_START_SEND_LIVE_EVENTS);
                return;
            case true:
                ackSubscription(PROTOCOL_CMD_STOP_SEND_LIVE_EVENTS);
                return;
            case true:
                LOGGER.trace("Ack for JWT received.");
                return;
            default:
                JsonObject tryToGetMessageAsJsonObject = tryToGetMessageAsJsonObject(str);
                if (null == tryToGetMessageAsJsonObject || null == (tryToGetJsonifiableAdaptableFromMessageJson = tryToGetJsonifiableAdaptableFromMessageJson(tryToGetMessageAsJsonObject)) || null == (tryToAdaptToSignal = tryToAdaptToSignal(tryToGetJsonifiableAdaptableFromMessageJson, str))) {
                    return;
                }
                TopicPath.Channel channelOrNull = getChannelOrNull(tryToGetJsonifiableAdaptableFromMessageJson);
                String charSequence = getCorrelationIdOrThrow((DittoHeaders) tryToGetJsonifiableAdaptableFromMessageJson.getHeaders().orElseGet(DittoHeaders::empty)).toString();
                if (this.customAdaptableResponseFutures.containsKey(charSequence)) {
                    this.customAdaptableResponseFutures.remove(charSequence).complete(tryToGetJsonifiableAdaptableFromMessageJson);
                    return;
                }
                if (TopicPath.Channel.TWIN == channelOrNull) {
                    handleTwinMessage(str, charSequence, tryToAdaptToSignal);
                    return;
                } else if (TopicPath.Channel.LIVE == channelOrNull) {
                    handleLiveMessage(str, charSequence, tryToAdaptToSignal);
                    return;
                } else {
                    LOGGER.warn("Client <{}>: Got Signal on unknown channel <{}>: <{}>", new Object[]{this.sessionId, channelOrNull, tryToAdaptToSignal});
                    return;
                }
        }
    }

    private void ackSubscription(String str) {
        CompletableFuture<Void> remove = this.subscriptionsAcks.remove(str);
        if (null != remove) {
            LOGGER.trace("Acking pending <{}>.", str);
            remove.complete(null);
        }
    }

    @Nullable
    private JsonObject tryToGetMessageAsJsonObject(String str) {
        try {
            return getMessageAsJsonObject(str);
        } catch (JsonParseException e) {
            LOGGER.warn("Client <{}>: Got unknown non-JSON message on WebSocket: {}", new Object[]{this.sessionId, str, e});
            return null;
        }
    }

    private static JsonObject getMessageAsJsonObject(String str) {
        JsonValue readFrom = JsonFactory.readFrom(str);
        if (readFrom.isObject()) {
            return readFrom.asObject();
        }
        throw new JsonParseException(MessageFormat.format("The WebSocket message was not a JSON object as required:\n{0}", str));
    }

    @Nullable
    private JsonifiableAdaptable tryToGetJsonifiableAdaptableFromMessageJson(JsonObject jsonObject) {
        try {
            return getJsonifiableAdaptableFromMessageJson(jsonObject);
        } catch (JsonRuntimeException e) {
            LOGGER.warn("Client <{}>: Incoming message could not be parsed to JSON due to: <{}>:\n  <{}>", new Object[]{this.sessionId, e.getMessage(), jsonObject});
            return null;
        }
    }

    private static JsonifiableAdaptable getJsonifiableAdaptableFromMessageJson(JsonObject jsonObject) {
        return ProtocolFactory.jsonifiableAdaptableFromJson(jsonObject);
    }

    @Nullable
    private Signal<?> tryToAdaptToSignal(Adaptable adaptable, String str) {
        try {
            return adaptToSignal(adaptable);
        } catch (DittoRuntimeException e) {
            LOGGER.warn("Client <{}>: Incoming message could not be parsed to Signal due to: <{}>:\n <{}>", new Object[]{this.sessionId, e.getMessage(), str});
            return null;
        }
    }

    private Signal<?> adaptToSignal(Adaptable adaptable) {
        return this.protocolAdapter.fromAdaptable(adaptable);
    }

    @Nullable
    private static TopicPath.Channel getChannelOrNull(Adaptable adaptable) {
        TopicPath.Channel channel = adaptable.getTopicPath().getChannel();
        if (null == channel) {
            channel = (TopicPath.Channel) adaptable.getHeaders().flatMap((v0) -> {
                return v0.getChannel();
            }).flatMap(TopicPath.Channel::forName).orElse(null);
        }
        return channel;
    }

    private static CharSequence getCorrelationIdOrThrow(DittoHeaders dittoHeaders) {
        return (CharSequence) dittoHeaders.getCorrelationId().orElseThrow(() -> {
            return new JsonMissingFieldException(JsonifiableAdaptable.JsonFields.HEADERS.getPointer().append(JsonPointer.of(DittoHeaderDefinition.CORRELATION_ID.getKey())));
        });
    }

    private void handleTwinMessage(String str, CharSequence charSequence, Signal signal) {
        if (signal instanceof ThingCommandResponse) {
            LOGGER.debug("Client <{}>: Received TWIN Response JSON: {}", this.sessionId, str);
            if (signal instanceof ThingErrorResponse) {
                DittoRuntimeException dittoRuntimeException = ((ErrorResponse) signal).getDittoRuntimeException();
                LOGGER.warn("Client <{}>: Got TWIN ThingErrorResponse: <{}: {} - {}>", new Object[]{this.sessionId, dittoRuntimeException.getClass().getSimpleName(), dittoRuntimeException.getMessage(), (String) dittoRuntimeException.getDescription().orElse("")});
            }
            this.commandResponseConsumer.accept((ThingCommandResponse) signal);
            return;
        }
        if (signal instanceof ThingEvent) {
            LOGGER.debug("Client <{}>: Received TWIN Event JSON: {}", this.sessionId, str);
            handleThingEvent(charSequence, (ThingEvent) signal, TwinImpl.CONSUME_TWIN_EVENTS_HANDLER);
        } else {
            if (!(signal instanceof DittoRuntimeException)) {
                LOGGER.warn("Client <{}>: Got unknown message on WebSocket on TWIN channel: {}", this.sessionId, str);
                return;
            }
            ThingCommandResponse of = ThingErrorResponse.of((DittoRuntimeException) signal);
            LOGGER.debug("Client <{}>: Received TWIN Error JSON: {}", this.sessionId, str);
            this.commandResponseConsumer.accept(of);
        }
    }

    private void handleLiveMessage(String str, CharSequence charSequence, Jsonifiable<JsonObject> jsonifiable) {
        if (jsonifiable instanceof MessageCommand) {
            LOGGER.debug("Client <{}>: Received LIVE MessageCommand JSON: {}", this.sessionId, str);
            handleLiveMessage((MessageCommand) jsonifiable);
            return;
        }
        if (jsonifiable instanceof MessageCommandResponse) {
            LOGGER.debug("Client <{}>: Received LIVE MessageCommandResponse JSON: {}", this.sessionId, str);
            handleLiveMessageResponse((MessageCommandResponse) jsonifiable);
            return;
        }
        if (jsonifiable instanceof ThingCommand) {
            LOGGER.debug("Client <{}>: Received LIVE ThingCommand JSON: {}", this.sessionId, str);
            handleLiveCommand(LiveCommandFactory.getInstance().getLiveCommand((Command) jsonifiable));
            return;
        }
        if (jsonifiable instanceof ThingErrorResponse) {
            DittoRuntimeException dittoRuntimeException = ((ErrorResponse) jsonifiable).getDittoRuntimeException();
            LOGGER.warn("Client <{}>: Got LIVE ThingErrorResponse: <{}: {} - {}>", new Object[]{this.sessionId, dittoRuntimeException.getClass().getSimpleName(), dittoRuntimeException.getMessage(), (String) dittoRuntimeException.getDescription().orElse("")});
            if (this.messageCommandResponseConsumers.containsKey(charSequence.toString())) {
                handleLiveMessageResponse((ErrorResponse) jsonifiable);
                return;
            } else {
                handleLiveCommandResponse((ThingErrorResponse) jsonifiable);
                return;
            }
        }
        if (jsonifiable instanceof ThingCommandResponse) {
            LOGGER.debug("Client <{}>: Received LIVE ThingCommandResponse JSON: {}", this.sessionId, str);
            handleLiveCommandResponse((ThingCommandResponse) jsonifiable);
            return;
        }
        if (jsonifiable instanceof ThingEvent) {
            LOGGER.debug("Client <{}>: Received LIVE ThingEvent JSON: {}", this.sessionId, str);
            handleThingEvent(charSequence, (ThingEvent) jsonifiable, LiveImpl.CONSUME_LIVE_EVENTS_HANDLER);
        } else {
            if (!(jsonifiable instanceof DittoRuntimeException)) {
                LOGGER.warn("Client <{}>: Got unknown message on WebSocket on LIVE channel: {}", this.sessionId, str);
                return;
            }
            ThingErrorResponse of = ThingErrorResponse.of((DittoRuntimeException) jsonifiable);
            LOGGER.debug("Client <{}>: Received LIVE Error JSON: {}", this.sessionId, str);
            if (this.messageCommandResponseConsumers.containsKey(charSequence.toString())) {
                handleLiveMessageResponse((ErrorResponse) of);
            } else {
                handleLiveCommandResponse(of);
            }
        }
    }

    private void handleThingEvent(CharSequence charSequence, ThingEvent thingEvent, String str) {
        Message<?> build = Message.newBuilder(MessageHeaders.newBuilder(MessageDirection.FROM, thingEvent.getEntityId(), thingEvent.getType()).correlationId(charSequence).build()).payload(thingEvent).build();
        Consumer<Message<?>> consumer = this.subscriptions.get(str);
        if (consumer != null) {
            consumer.accept(build);
        } else {
            LOGGER.debug("Client <{}>: Dropping incoming event as no subscription for consuming events was registered. Did you call 'client.twin().startConsumption()' or 'client.live().startConsumption()' ?", this.sessionId);
        }
    }

    private void handleLiveMessage(MessageCommand messageCommand) {
        Message<?> message = messageCommand.getMessage();
        Consumer<Message<?>> consumer = this.subscriptions.get(LiveImpl.CONSUME_LIVE_MESSAGES_HANDLER);
        if (consumer != null) {
            consumer.accept(message);
        } else {
            LOGGER.warn("Client <{}>: Dropping incoming message as no subscription for consuming messages was registered. Did you call 'client.twin().startConsumption()' or 'client.live().startConsumption()'?", this.sessionId);
        }
    }

    private void handleLiveMessageResponse(MessageCommandResponse messageCommandResponse) {
        Message message = messageCommandResponse.getMessage();
        LOGGER.debug("Client <{}>: Received response message: {}", this.sessionId, message);
        Optional.ofNullable(this.messageCommandResponseConsumers.remove((String) messageCommandResponse.getDittoHeaders().getCorrelationId().orElse(""))).ifPresent(messageResponseConsumer -> {
            messageResponseConsumer.getResponseConsumer().accept(message, null);
        });
    }

    private void handleLiveMessageResponse(ErrorResponse errorResponse) {
        Optional.ofNullable(this.messageCommandResponseConsumers.remove((String) errorResponse.getDittoHeaders().getCorrelationId().orElse(""))).ifPresent(messageResponseConsumer -> {
            messageResponseConsumer.getResponseConsumer().accept(null, errorResponse.getDittoRuntimeException());
        });
    }

    private void handleLiveCommand(LiveCommand liveCommand) {
        DittoHeaders dittoHeaders = liveCommand.getDittoHeaders();
        Optional schemaVersion = dittoHeaders.getSchemaVersion();
        if (schemaVersion.isPresent() && !((JsonSchemaVersion) schemaVersion.get()).equals(this.messagingConfiguration.getJsonSchemaVersion())) {
            LOGGER.trace("Client <{}>: Received live command in other JsonSchemaVersion ({}) than the client uses ({}), not  delivering it: {}", new Object[]{this.sessionId, schemaVersion.get(), this.messagingConfiguration.getEndpointUri(), liveCommand});
            return;
        }
        Message<?> build = Message.newBuilder(MessageHeaders.newBuilder(MessageDirection.FROM, liveCommand.getThingEntityId(), liveCommand.getType()).correlationId((CharSequence) dittoHeaders.getCorrelationId().orElse(null)).build()).payload(liveCommand).build();
        Consumer<Message<?>> consumer = this.subscriptions.get(LiveImpl.CONSUME_LIVE_COMMANDS_HANDLER);
        if (consumer != null) {
            consumer.accept(build);
        } else {
            LOGGER.warn("Client <{}>: Dropping incoming live command as no subscription for consuming live commands was registered. Did you call 'client.live().startConsumption()' ?", this.sessionId);
        }
    }

    private void handleLiveCommandResponse(CommandResponse commandResponse) {
        this.commandResponseConsumer.accept((ThingCommandResponse) commandResponse);
    }
}
