package it.netgrid.bauer.impl.impl;

import it.netgrid.bauer.EventHandler;
import it.netgrid.bauer.helpers.TopicUtils;
import it.netgrid.bauer.impl.MqttMessageConsumer;
import it.netgrid.bauer.impl.MqttMessageFactory;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:it/netgrid/bauer/impl/impl/SimpleMqttMessageConsumer.class */
public class SimpleMqttMessageConsumer<E> implements MqttMessageConsumer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SimpleMqttMessageConsumer.class);
    private final LinkedBlockingQueue<SimpleMqttMessageConsumer<E>.Nullable> incomingEvents = new LinkedBlockingQueue<>();
    private final EventHandler<E> handler;
    private final String mqttPattern;
    private final boolean sharedSubscription;
    private final MqttMessageFactory factory;
    private final String name;

    /* loaded from: input_file:it/netgrid/bauer/impl/impl/SimpleMqttMessageConsumer$Nullable.class */
    private class Nullable {
        public final E payload;
        public final String topic;

        Nullable(SimpleMqttMessageConsumer simpleMqttMessageConsumer, String str, E e) {
            this.payload = e;
            this.topic = str;
        }
    }

    public SimpleMqttMessageConsumer(MqttMessageFactory mqttMessageFactory, String str, String str2, boolean z, EventHandler<E> eventHandler) {
        this.name = str;
        this.mqttPattern = str2;
        this.sharedSubscription = z;
        this.factory = mqttMessageFactory;
        this.handler = eventHandler;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                SimpleMqttMessageConsumer<E>.Nullable take = this.incomingEvents.take();
                this.handler.handle(take.topic, take.payload);
            } catch (Exception e) {
                log.warn(String.format("%s handler: %s", this.mqttPattern, e.getMessage()));
                return;
            }
        }
    }

    @Override // it.netgrid.bauer.impl.MqttMessageConsumer
    public boolean consume(String str, MqttMessage mqttMessage) throws IOException {
        if (!matches(str)) {
            return false;
        }
        try {
            this.incomingEvents.put(new Nullable(this, str, this.factory.getEvent(mqttMessage, this.handler.getEventClass())));
            return true;
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    public String normalizedTopicFrom(String str) {
        if (!str.startsWith("$share/")) {
            return str;
        }
        String[] split = str.split("/", 3);
        return split[split.length - 1];
    }

    public boolean matches(String str) {
        return TopicUtils.match(this.mqttPattern, normalizedTopicFrom(str));
    }

    public String toString() {
        return String.format("%s->%s[%s]", this.mqttPattern, this.handler.getName(), this.handler.getEventClass().getSimpleName());
    }

    @Override // it.netgrid.bauer.impl.MqttMessageConsumer
    public MqttSubscription getMqttSubscription() {
        return new MqttSubscription(this.sharedSubscription ? this.name : this.mqttPattern);
    }
}
