package org.correomqtt.business.mqtt;

import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener;
import com.hivemq.client.mqtt.lifecycle.MqttDisconnectSource;
import com.hivemq.client.util.KeyStoreUtil;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.connection.channel.direct.LocalPortForwarder;
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
import org.correomqtt.business.dispatcher.ConnectionLifecycleDispatcher;
import org.correomqtt.business.exception.CorreoMqttAlreadySubscribedException;
import org.correomqtt.business.exception.CorreoMqttNoRetriesLeftException;
import org.correomqtt.business.exception.CorreoMqttSshFailedException;
import org.correomqtt.business.model.Auth;
import org.correomqtt.business.model.ConnectionConfigDTO;
import org.correomqtt.business.model.MessageDTO;
import org.correomqtt.business.model.Proxy;
import org.correomqtt.business.model.SubscriptionDTO;
import org.slf4j.Logger;
import org.slf4j.MarkerFactory;

/* loaded from: input_file:org/correomqtt/business/mqtt/BaseCorreoMqttClient.class */
abstract class BaseCorreoMqttClient implements CorreoMqttClient, MqttClientDisconnectedListener, MqttClientConnectedListener {
    private static final int MAX_RECONNECTS = 5;
    private final ConnectionConfigDTO configDTO;
    private final AtomicBoolean wasConnectedBefore = new AtomicBoolean(false);
    private final AtomicBoolean tryToReconnect = new AtomicBoolean(false);
    private final AtomicInteger triedReconnects = new AtomicInteger(0);
    private final Set<SubscriptionDTO> subscriptions = new HashSet();
    private SSHClient sshClient;
    private LocalPortForwarder localPortforwarder;

    public BaseCorreoMqttClient(ConnectionConfigDTO connectionConfigDTO) {
        this.configDTO = connectionConfigDTO;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectionConfigDTO getConfigDTO() {
        return this.configDTO;
    }

    abstract Logger getLogger();

    @Override // org.correomqtt.business.mqtt.CorreoMqttClient
    public Set<SubscriptionDTO> getSubscriptions() {
        return new HashSet(this.subscriptions);
    }

    @Override // org.correomqtt.business.mqtt.CorreoMqttClient
    public synchronized void connect() throws InterruptedException, ExecutionException, TimeoutException, SSLException {
        if (this.configDTO.getProxy().equals(Proxy.SSH)) {
            try {
                setupSsh();
            } catch (IOException e) {
                getLogger().error(MarkerFactory.getMarker(this.configDTO.getName()), "Error while creating ssh connection. ", e);
                disconnect(false);
                throw new CorreoMqttSshFailedException(e);
            }
        }
        getLogger().info(MarkerFactory.getMarker(this.configDTO.getName()), "Connecting to Broker using {}", this.configDTO.getMqttVersion().getDescription());
        executeConnect();
        this.wasConnectedBefore.set(true);
    }

    private void setupSsh() throws IOException, InterruptedException {
        getLogger().info(MarkerFactory.getMarker(this.configDTO.getName()), "Creating SSH tunnel to {}:{}.", this.configDTO.getSshHost(), Integer.valueOf(this.configDTO.getPort()));
        this.sshClient = new SSHClient();
        this.sshClient.addHostKeyVerifier(new PromiscuousVerifier());
        this.sshClient.connect(this.configDTO.getSshHost(), this.configDTO.getSshPort());
        if (this.configDTO.getAuth().equals(Auth.PASSWORD)) {
            this.sshClient.authPassword(this.configDTO.getAuthUsername(), this.configDTO.getAuthPassword());
        } else if (this.configDTO.getAuth().equals(Auth.KEYFILE)) {
            this.sshClient.authPublickey(this.configDTO.getAuthUsername(), new String[]{this.configDTO.getAuthKeyfile()});
        }
        LocalPortForwarder.Parameters parameters = new LocalPortForwarder.Parameters("localhost", this.configDTO.getLocalPort(), this.configDTO.getUrl(), this.configDTO.getPort());
        CompletableFuture.runAsync(() -> {
            try {
                ServerSocket serverSocket = new ServerSocket();
                try {
                    serverSocket.setReuseAddress(true);
                    serverSocket.bind(new InetSocketAddress(parameters.getLocalHost(), parameters.getLocalPort()));
                    this.localPortforwarder = this.sshClient.newLocalPortForwarder(parameters, serverSocket);
                    this.localPortforwarder.listen();
                    serverSocket.close();
                } finally {
                }
            } catch (Exception e) {
                getLogger().error(MarkerFactory.getMarker(this.configDTO.getName()), "SSH socket to {}:{} failed.", this.configDTO.getSshHost(), Integer.valueOf(this.configDTO.getPort()));
                throw new CorreoMqttSshFailedException(e);
            }
        }).whenCompleteAsync((r6, th) -> {
            if (th != null) {
                getLogger().error(MarkerFactory.getMarker(this.configDTO.getName()), "SSH tunnel broke. Connection to broker disconnected by system. ", th);
                disconnect(false);
            }
        });
        int i = 0;
        while (!this.sshClient.isConnected()) {
            if (i >= MAX_RECONNECTS) {
                getLogger().error(MarkerFactory.getMarker(this.configDTO.getName()), "SSH tunnel to {}:{} failed.", this.configDTO.getSshHost(), Integer.valueOf(this.configDTO.getPort()));
                return;
            } else {
                i++;
                TimeUnit.SECONDS.sleep(1L);
            }
        }
        getLogger().info(MarkerFactory.getMarker(this.configDTO.getName()), "SSH tunnel to {}:{} established.", this.configDTO.getSshHost(), Integer.valueOf(this.configDTO.getPort()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getDestinationPort() {
        return this.configDTO.getProxy().equals(Proxy.SSH) ? this.configDTO.getLocalPort() : this.configDTO.getPort();
    }

    abstract void executeConnect() throws SSLException, InterruptedException, ExecutionException, TimeoutException;

    public void onDisconnected(MqttClientDisconnectedContext mqttClientDisconnectedContext) {
        if (this.tryToReconnect.get()) {
            if (mqttClientDisconnectedContext.getSource() == MqttDisconnectSource.SERVER) {
                getLogger().info(MarkerFactory.getMarker(this.configDTO.getName()), "Disconnected by {}. Connection to broker lost.", mqttClientDisconnectedContext.getSource());
                reconnect(mqttClientDisconnectedContext);
            } else if (mqttClientDisconnectedContext.getSource() == MqttDisconnectSource.CLIENT) {
                getLogger().info(MarkerFactory.getMarker(this.configDTO.getName()), "Disconnected by {}. Connection to broker not possible.", mqttClientDisconnectedContext.getSource());
                reconnect(mqttClientDisconnectedContext);
            } else if (mqttClientDisconnectedContext.getSource() == MqttDisconnectSource.USER) {
                try {
                    this.sshClient.disconnect();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                getLogger().info(MarkerFactory.getMarker(this.configDTO.getName()), "Disconnected by {}. Connection to broker disconnected by user.", mqttClientDisconnectedContext.getSource());
            }
        }
    }

    public void onConnected(MqttClientConnectedContext mqttClientConnectedContext) {
        this.triedReconnects.set(0);
        this.tryToReconnect.set(true);
        if (this.wasConnectedBefore.get()) {
            ConnectionLifecycleDispatcher.getInstance().onConnectionReconnected(this.configDTO.getId());
            getLogger().info(MarkerFactory.getMarker(this.configDTO.getName()), "Reconnected to broker successfully");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KeyManagerFactory getKeyManagerFactory() throws SSLException {
        return KeyStoreUtil.keyManagerFromKeystore(new File(this.configDTO.getSslKeystore()), this.configDTO.getSslKeystorePassword(), this.configDTO.getSslKeystorePassword());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TrustManagerFactory getTrustManagerFactory() throws SSLException {
        return KeyStoreUtil.trustManagerFromKeystore(new File(this.configDTO.getSslKeystore()), this.configDTO.getSslKeystorePassword());
    }

    @Override // org.correomqtt.business.mqtt.CorreoMqttClient
    public synchronized void unsubscribe(SubscriptionDTO subscriptionDTO) {
        doUnsubscribe(subscriptionDTO);
        this.subscriptions.remove(subscriptionDTO);
    }

    abstract void doUnsubscribe(SubscriptionDTO subscriptionDTO);

    @Override // org.correomqtt.business.mqtt.CorreoMqttClient
    public synchronized void publish(MessageDTO messageDTO) throws InterruptedException, ExecutionException, TimeoutException {
        doPublish(messageDTO);
    }

    abstract void doPublish(MessageDTO messageDTO) throws InterruptedException, ExecutionException, TimeoutException;

    abstract void doSubscribe(SubscriptionDTO subscriptionDTO, Consumer<MessageDTO> consumer) throws InterruptedException, ExecutionException, TimeoutException;

    @Override // org.correomqtt.business.mqtt.CorreoMqttClient
    public synchronized void subscribe(SubscriptionDTO subscriptionDTO, Consumer<MessageDTO> consumer) throws InterruptedException, ExecutionException, TimeoutException {
        if (this.subscriptions.contains(subscriptionDTO)) {
            throw new CorreoMqttAlreadySubscribedException(getConfigDTO().getId(), subscriptionDTO);
        }
        doSubscribe(subscriptionDTO, consumer);
        this.subscriptions.add(subscriptionDTO);
    }

    private synchronized void reconnect(MqttClientDisconnectedContext mqttClientDisconnectedContext) {
        getLogger().info(MarkerFactory.getMarker(this.configDTO.getName()), "Reconnecting connect to Broker.");
        if (!this.tryToReconnect.get() || this.triedReconnects.get() >= MAX_RECONNECTS || mqttClientDisconnectedContext.getSource() == MqttDisconnectSource.USER) {
            getLogger().error(MarkerFactory.getMarker(this.configDTO.getName()), "Maximum number of reconnects reached.");
            ConnectionLifecycleDispatcher.getInstance().onConnectionFailed(this.configDTO.getId(), new CorreoMqttNoRetriesLeftException());
        } else {
            doReconnect(mqttClientDisconnectedContext);
            ConnectionLifecycleDispatcher.getInstance().onReconnectFailed(this.configDTO.getId(), this.triedReconnects, MAX_RECONNECTS);
            this.triedReconnects.incrementAndGet();
        }
    }

    abstract void doReconnect(MqttClientDisconnectedContext mqttClientDisconnectedContext);

    @Override // org.correomqtt.business.mqtt.CorreoMqttClient
    public synchronized void disconnect(boolean z) {
        this.tryToReconnect.set(false);
        if (isConnected()) {
            doDisconnect();
            getLogger().info(MarkerFactory.getMarker(this.configDTO.getName()), "Disconnected from broker.");
        } else {
            getLogger().info("Disconnecting client was not possible, cause was not connected.");
        }
        if (this.localPortforwarder != null) {
            getLogger().debug(MarkerFactory.getMarker(this.configDTO.getName()), "Disconnecting SSH socket for {}:{}.", this.configDTO.getSshHost(), Integer.valueOf(this.configDTO.getPort()));
            try {
                this.localPortforwarder.close();
                getLogger().info(MarkerFactory.getMarker(this.configDTO.getName()), "SSH socket for {}:{} closed", this.configDTO.getSshHost(), Integer.valueOf(this.configDTO.getPort()));
            } catch (IOException e) {
                getLogger().error(MarkerFactory.getMarker(this.configDTO.getName()), "Disconnecting SSH socket for {}:{} failed", this.configDTO.getSshHost(), Integer.valueOf(this.configDTO.getPort()));
            }
            this.localPortforwarder = null;
        }
        if (this.sshClient == null || !this.sshClient.isConnected()) {
            return;
        }
        getLogger().debug(MarkerFactory.getMarker(this.configDTO.getName()), "Disconnecting SSH tunnel for {}:{}.", this.configDTO.getSshHost(), Integer.valueOf(this.configDTO.getPort()));
        try {
            this.sshClient.disconnect();
            this.sshClient.close();
            getLogger().info(MarkerFactory.getMarker(this.configDTO.getName()), "SSH tunnel for {}:{} closed", this.configDTO.getSshHost(), Integer.valueOf(this.configDTO.getPort()));
        } catch (IOException e2) {
            getLogger().error(MarkerFactory.getMarker(this.configDTO.getName()), "Disconnecting SSH tunnel for {}:{} failed", this.configDTO.getSshHost(), Integer.valueOf(this.configDTO.getPort()));
        }
        this.sshClient = null;
    }

    abstract void doDisconnect();

    abstract boolean isConnected();
}
