package fr.sewatech.mqttra.connector.inbound;

import fr.sewatech.mqttra.api.Message;
import fr.sewatech.mqttra.api.MessageListener;
import fr.sewatech.mqttra.api.MqttListener;
import fr.sewatech.mqttra.api.Topic;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.Connector;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.TransactionSupport;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.xa.XAResource;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;

@Connector(vendorName = "sewatech", version = "0.1", eisType = "MQTT Broker", transactionSupport = TransactionSupport.TransactionSupportLevel.NoTransaction)
/* loaded from: input_file:fr/sewatech/mqttra/connector/inbound/MqttResourceAdapter.class */
public class MqttResourceAdapter implements ResourceAdapter {
    private static final Logger logger = Logger.getLogger(MqttResourceAdapter.class.getName());
    Map<Key, CallbackConnection> connections = new HashMap();
    private BootstrapContext bootstrapContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:fr/sewatech/mqttra/connector/inbound/MqttResourceAdapter$Key.class */
    public class Key {
        private MessageEndpointFactory factory;
        private ActivationSpec activationSpec;
        private Method method;

        Key(MessageEndpointFactory messageEndpointFactory, ActivationSpec activationSpec, Method method) {
            this.factory = messageEndpointFactory;
            this.activationSpec = activationSpec;
            this.method = method;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Key key = (Key) obj;
            return Objects.equals(this.factory, key.factory) && Objects.equals(this.activationSpec, key.activationSpec) && Objects.equals(this.method, key.method);
        }

        public int hashCode() {
            return Objects.hash(this.factory, this.activationSpec, this.method);
        }

        public String toString() {
            return this.factory.getClass().getSimpleName() + " / " + this.activationSpec;
        }
    }

    public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
        this.bootstrapContext = bootstrapContext;
    }

    public void stop() {
    }

    public void endpointActivation(MessageEndpointFactory messageEndpointFactory, ActivationSpec activationSpec) throws ResourceException {
        logger.fine("endpoint activation");
        ActivationSpecBean activationSpecBean = (ActivationSpecBean) ActivationSpecBean.class.cast(activationSpec);
        try {
            MqttListenerProxy createEndPointProxy = createEndPointProxy(initializeEndpointsPool(messageEndpointFactory, activationSpecBean));
            Class<?> endpointClass = messageEndpointFactory.getEndpointClass();
            if (isOldFashion(endpointClass)) {
                Method declaredMethod = endpointClass.getDeclaredMethod("onMessage", Message.class);
                createConnection(messageEndpointFactory, declaredMethod, activationSpecBean).listener(new MqttClientListener(createEndPointProxy, declaredMethod));
            }
            for (Method method : endpointClass.getMethods()) {
                if (method.isAnnotationPresent(Topic.class)) {
                    createConnection(messageEndpointFactory, method, activationSpecBean).listener(new MqttClientListener(createEndPointProxy, method));
                }
            }
        } catch (Exception e) {
            throw new ResourceException(e);
        }
    }

    public void endpointDeactivation(MessageEndpointFactory messageEndpointFactory, ActivationSpec activationSpec) {
        logger.fine("Endpoint deactivation");
        try {
            Class<?> endpointClass = messageEndpointFactory.getEndpointClass();
            if (isOldFashion(endpointClass)) {
                unregisterConnection(new Key(messageEndpointFactory, activationSpec, endpointClass.getDeclaredMethod("onMessage", Message.class)));
            }
            for (Method method : endpointClass.getMethods()) {
                if (method.isAnnotationPresent(Topic.class)) {
                    unregisterConnection(new Key(messageEndpointFactory, activationSpec, method));
                }
            }
        } catch (Throwable th) {
            logger.log(Level.WARNING, "Unable to deactivate an endpoint", th);
        }
    }

    private boolean isOldFashion(Class<?> cls) {
        return MessageListener.class.isAssignableFrom(cls);
    }

    private void unregisterConnection(Key key) {
        final CallbackConnection remove = this.connections.remove(key);
        if (remove == null) {
            logger.fine("Cannot find connection for key " + key);
            return;
        }
        logger.fine("Connection found for key " + key);
        remove.suspend();
        remove.getDispatchQueue().execute(new Task() { // from class: fr.sewatech.mqttra.connector.inbound.MqttResourceAdapter.1
            public void run() {
                remove.kill(new LoggingCallback("disconnect"));
            }
        });
    }

    private BlockingQueue<MqttListener> initializeEndpointsPool(MessageEndpointFactory messageEndpointFactory, ActivationSpecBean activationSpecBean) throws UnavailableException {
        int poolSize = activationSpecBean.getPoolSize();
        logger.fine("Initializing pool with " + poolSize + " connections");
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(poolSize);
        for (int i = 0; i < poolSize; i++) {
            arrayBlockingQueue.add(MqttListener.class.cast(messageEndpointFactory.createEndpoint((XAResource) null)));
        }
        return arrayBlockingQueue;
    }

    private MqttListenerProxy createEndPointProxy(BlockingQueue<MqttListener> blockingQueue) {
        return new MqttListenerProxy(this.bootstrapContext, blockingQueue);
    }

    private CallbackConnection createConnection(final MessageEndpointFactory messageEndpointFactory, final Method method, final ActivationSpecBean activationSpecBean) throws URISyntaxException {
        logger.fine("Creating connection to " + activationSpecBean.getUserName() + " with login " + activationSpecBean.getUserName());
        MQTT mqtt = new MQTT();
        mqtt.setUserName(activationSpecBean.getUserName());
        mqtt.setPassword(activationSpecBean.getPassword());
        mqtt.setHost(activationSpecBean.getServerUrl());
        final CallbackConnection callbackConnection = mqtt.callbackConnection();
        callbackConnection.connect(new LoggingCallback<Void>("connect") { // from class: fr.sewatech.mqttra.connector.inbound.MqttResourceAdapter.2
            @Override // fr.sewatech.mqttra.connector.inbound.LoggingCallback
            public void onSuccess(Void r10) {
                String topicName;
                QoS qos;
                super.onSuccess((AnonymousClass2) r10);
                Topic annotation = method.getAnnotation(Topic.class);
                if (annotation == null) {
                    topicName = activationSpecBean.getTopicName();
                    qos = activationSpecBean.getQos();
                } else {
                    topicName = annotation.name() == null ? activationSpecBean.getTopicName() : annotation.name();
                    qos = annotation.qos() == null ? activationSpecBean.getQos() : annotation.qos();
                }
                callbackConnection.subscribe(new org.fusesource.mqtt.client.Topic[]{new org.fusesource.mqtt.client.Topic(topicName, qos)}, new LoggingCallback("subscribe"));
                Key key = new Key(messageEndpointFactory, activationSpecBean, method);
                MqttResourceAdapter.this.connections.put(key, callbackConnection);
                MqttResourceAdapter.logger.fine("Connection registered under key " + key);
            }
        });
        return callbackConnection;
    }

    public XAResource[] getXAResources(ActivationSpec[] activationSpecArr) throws ResourceException {
        logger.fine("Asked fir XA resources, but none to provide");
        return new XAResource[0];
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return Objects.equals(this.bootstrapContext, ((MqttResourceAdapter) obj).bootstrapContext);
    }

    public int hashCode() {
        return Objects.hash(this.bootstrapContext);
    }
}
