package com.github.christophersmith.summer.mqtt.core.service;

import com.github.christophersmith.summer.mqtt.core.MqttClientConfiguration;
import com.github.christophersmith.summer.mqtt.core.MqttClientConnectionType;
import com.github.christophersmith.summer.mqtt.core.MqttQualityOfService;
import com.github.christophersmith.summer.mqtt.core.TopicSubscription;
import com.github.christophersmith.summer.mqtt.core.util.MqttClientEventPublisher;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;

/* loaded from: input_file:com/github/christophersmith/summer/mqtt/core/service/AbstractMqttClientService.class */
public abstract class AbstractMqttClientService implements MqttClientService {
    protected final transient MqttClientConnectionType connectionType;
    protected ApplicationEventPublisher applicationEventPublisher;
    protected ReconnectService reconnectService;
    protected TaskScheduler taskScheduler;
    protected ScheduledFuture<?> scheduledFuture;
    protected boolean firstStartOccurred;
    protected boolean started;
    protected MessageChannel inboundMessageChannel;
    protected final transient ReentrantLock reentrantLock = new ReentrantLock(true);
    protected final transient List<TopicSubscription> topicSubscriptions = new ArrayList();
    protected final transient MqttClientConfiguration mqttClientConfiguration = new MqttClientConfiguration();
    protected final transient MqttClientEventPublisher mqttClientEventPublisher = new MqttClientEventPublisher();

    protected AbstractMqttClientService(MqttClientConnectionType mqttClientConnectionType) {
        Assert.notNull(mqttClientConnectionType, "'connectionType' must be set!");
        this.connectionType = mqttClientConnectionType;
    }

    @Override // com.github.christophersmith.summer.mqtt.core.service.MqttClientService
    public MqttClientConnectionType getConnectionType() {
        return this.connectionType;
    }

    @Override // com.github.christophersmith.summer.mqtt.core.service.MqttClientService
    public abstract String getClientId();

    @Override // com.github.christophersmith.summer.mqtt.core.service.MqttClientService
    public boolean isStarted() {
        return this.started;
    }

    public MqttClientConfiguration getMqttClientConfiguration() {
        return this.mqttClientConfiguration;
    }

    @Override // com.github.christophersmith.summer.mqtt.core.service.MqttClientService
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void setInboundMessageChannel(MessageChannel messageChannel) {
        if (MqttClientConnectionType.PUBLISHER == this.connectionType) {
            throw new IllegalStateException(String.format("Client ID %s is setup as a PUBLISHER and cannot receive messages from the Broker!", getClientId()));
        }
        Assert.notNull(messageChannel, "'inboundMessageChannel' must be set!");
        this.inboundMessageChannel = messageChannel;
    }

    public void setReconnectDetails(ReconnectService reconnectService, TaskScheduler taskScheduler) {
        this.reconnectService = reconnectService;
        this.taskScheduler = taskScheduler;
    }

    @Override // com.github.christophersmith.summer.mqtt.core.service.MqttClientService
    public List<TopicSubscription> getTopicSubscriptions() {
        ArrayList arrayList = new ArrayList();
        this.reentrantLock.lock();
        try {
            this.topicSubscriptions.forEach(topicSubscription -> {
                arrayList.add(topicSubscription.m2clone());
            });
            return arrayList;
        } finally {
            this.reentrantLock.unlock();
        }
    }

    @Override // com.github.christophersmith.summer.mqtt.core.service.MqttClientService
    public void subscribe(String str) {
        subscribe(str, this.mqttClientConfiguration.getDefaultQualityOfService());
    }

    @Override // com.github.christophersmith.summer.mqtt.core.service.MqttClientService
    public abstract void subscribe(String str, MqttQualityOfService mqttQualityOfService);
}
