package io.joynr.messaging.mqtt;

import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.joynr.messaging.JoynrMessageProcessor;
import io.joynr.messaging.RawMessagingPreprocessor;
import io.joynr.messaging.mqtt.settings.LimitAndBackpressureSettings;
import io.joynr.messaging.mqtt.statusmetrics.MqttStatusReceiver;
import io.joynr.messaging.routing.MessageRouter;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import joynr.system.RoutingTypes.MqttAddress;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/joynr-mqtt-client-1.3.2.jar:io/joynr/messaging/mqtt/SharedSubscriptionsMqttMessagingSkeleton.class */
public class SharedSubscriptionsMqttMessagingSkeleton extends MqttMessagingSkeleton {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SharedSubscriptionsMqttMessagingSkeleton.class);
    private static final String NON_ALPHA_REGEX_PATTERN = "[^a-zA-Z]";
    private final String channelId;
    private final String sharedSubscriptionsTopic;
    private final AtomicBoolean subscribedToSharedSubscriptionsTopic;
    private final MqttAddress replyToAddress;
    private boolean backpressureEnabled;
    private final int backpressureIncomingMqttRequestsUpperThreshold;
    private final int backpressureIncomingMqttRequestsLowerThreshold;
    private final int unsubscribeThreshold;
    private final int resubscribeThreshold;

    @Inject
    public SharedSubscriptionsMqttMessagingSkeleton(@Named("property_mqtt_global_address") MqttAddress mqttAddress, @Named("joynr.messaging.maxincomingmqttrequests") int i, @Named("joynr.messaging.backpressure.enabled") boolean z, @Named("joynr.messaging.backpressure.incomingmqttrequests.upperthreshold") int i2, @Named("joynr.messaging.backpressure.incomingmqttrequests.lowerthreshold") int i3, @Named("property_mqtt_reply_to_address") MqttAddress mqttAddress2, MessageRouter messageRouter, MqttClientFactory mqttClientFactory, @Named("joynr.messaging.channelid") String str, MqttTopicPrefixProvider mqttTopicPrefixProvider, RawMessagingPreprocessor rawMessagingPreprocessor, Set<JoynrMessageProcessor> set, MqttStatusReceiver mqttStatusReceiver) {
        super(mqttAddress, i, messageRouter, mqttClientFactory, mqttTopicPrefixProvider, rawMessagingPreprocessor, set, mqttStatusReceiver);
        this.replyToAddress = mqttAddress2;
        this.channelId = str;
        this.sharedSubscriptionsTopic = createSharedSubscriptionsTopic();
        this.subscribedToSharedSubscriptionsTopic = new AtomicBoolean(false);
        this.backpressureEnabled = z;
        this.backpressureIncomingMqttRequestsUpperThreshold = i2;
        this.backpressureIncomingMqttRequestsLowerThreshold = i3;
        validateBackpressureValues();
        this.unsubscribeThreshold = (i * i2) / 100;
        this.resubscribeThreshold = (i * i3) / 100;
    }

    private void validateBackpressureValues() {
        if (this.backpressureEnabled) {
            boolean z = false;
            if (this.maxIncomingMqttRequests <= 0) {
                z = true;
                LOG.error("Invalid value {} for {}, expecting a limit greater than 0 when backpressure is activated", Integer.valueOf(this.maxIncomingMqttRequests), LimitAndBackpressureSettings.PROPERTY_MAX_INCOMING_MQTT_REQUESTS);
            }
            if (this.backpressureIncomingMqttRequestsUpperThreshold <= 0 || this.backpressureIncomingMqttRequestsUpperThreshold > 100) {
                z = true;
                LOG.error("Invalid value {} for {}, expecting percentage value in range (0,100]", Integer.valueOf(this.backpressureIncomingMqttRequestsUpperThreshold), LimitAndBackpressureSettings.PROPERTY_BACKPRESSURE_INCOMING_MQTT_REQUESTS_UPPER_THRESHOLD);
            }
            if (this.backpressureIncomingMqttRequestsLowerThreshold < 0 || this.backpressureIncomingMqttRequestsLowerThreshold >= 100) {
                z = true;
                LOG.error("Invalid value {} for {}, expecting percentage value in range [0,100)", Integer.valueOf(this.backpressureIncomingMqttRequestsLowerThreshold), LimitAndBackpressureSettings.PROPERTY_BACKPRESSURE_INCOMING_MQTT_REQUESTS_LOWER_THRESHOLD);
            }
            if (this.backpressureIncomingMqttRequestsLowerThreshold >= this.backpressureIncomingMqttRequestsUpperThreshold) {
                z = true;
                LOG.error("Lower threshold percentage {} must be stricly below the upper threshold percentage {}. Change the value of {} or {}", Integer.valueOf(this.backpressureIncomingMqttRequestsLowerThreshold), Integer.valueOf(this.backpressureIncomingMqttRequestsUpperThreshold), LimitAndBackpressureSettings.PROPERTY_BACKPRESSURE_INCOMING_MQTT_REQUESTS_LOWER_THRESHOLD, LimitAndBackpressureSettings.PROPERTY_BACKPRESSURE_INCOMING_MQTT_REQUESTS_UPPER_THRESHOLD);
            }
            if (z) {
                this.backpressureEnabled = false;
                LOG.error("Disabling backpressure mechanism because of invalid property settings");
                throw new IllegalArgumentException("Disabling backpressure mechanism because of invalid property settings");
            }
        }
    }

    @Override // io.joynr.messaging.mqtt.MqttMessagingSkeleton
    protected void subscribe() {
        getClient().subscribe(this.sharedSubscriptionsTopic);
        this.subscribedToSharedSubscriptionsTopic.set(true);
        getClient().subscribe(this.replyToAddress.getTopic() + MqttTopic.MULTI_LEVEL_WILDCARD_PATTERN);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.joynr.messaging.mqtt.MqttMessagingSkeleton
    public void requestAccepted(String str) {
        super.requestAccepted(str);
        if (this.backpressureEnabled && getCurrentCountOfUnprocessedMqttRequests() >= this.unsubscribeThreshold && this.subscribedToSharedSubscriptionsTopic.compareAndSet(true, false)) {
            getClient().unsubscribe(this.sharedSubscriptionsTopic);
            LOG.info("Unsubscribed from topic {} due to enabled backpressure mechanism and passed upper threshold of unprocessed MQTT requests", this.sharedSubscriptionsTopic);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.joynr.messaging.mqtt.MqttMessagingSkeleton
    public void requestProcessed(String str) {
        super.requestProcessed(str);
        if (this.backpressureEnabled && getCurrentCountOfUnprocessedMqttRequests() < this.resubscribeThreshold && this.subscribedToSharedSubscriptionsTopic.compareAndSet(false, true)) {
            getClient().subscribe(this.sharedSubscriptionsTopic);
            LOG.info("Subscribed again to topic {} due to enabled backpressure mechanism and passed lower threshold of unprocessed MQTT requests", this.sharedSubscriptionsTopic);
        }
    }

    private String createSharedSubscriptionsTopic() {
        return "$share:" + sanitiseChannelIdForUseAsTopic() + ":" + getOwnAddress().getTopic() + MqttTopic.MULTI_LEVEL_WILDCARD_PATTERN;
    }

    private String sanitiseChannelIdForUseAsTopic() {
        String replaceAll = this.channelId.replaceAll(NON_ALPHA_REGEX_PATTERN, "");
        if (replaceAll.isEmpty()) {
            throw new IllegalArgumentException(String.format("The channel ID %s cannot be converted to a valid MQTT topic fragment because it does not contain any alpha characters.", this.channelId));
        }
        return replaceAll;
    }
}
