/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt.handler.publish.incoming;

import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.exceptions.MqttClientStateExceptions;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingQosHandler;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubscriptionHandler;
import com.hivemq.client.internal.mqtt.ioc.ClientComponent;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.rx.FlowableWithSingle;
import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber;
import com.hivemq.shaded.org.jetbrains.annotations.NotNull;
import io.reactivex.internal.subscriptions.EmptySubscription;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class MqttSubscribedPublishFlowable
extends FlowableWithSingle<Mqtt5Publish, Mqtt5SubAck> {
    @NotNull
    private final MqttSubscribe subscribe;
    @NotNull
    private final MqttClientConfig clientConfig;

    public MqttSubscribedPublishFlowable(@NotNull MqttSubscribe subscribe, @NotNull MqttClientConfig clientConfig) {
        this.subscribe = subscribe;
        this.clientConfig = clientConfig;
    }

    protected void subscribeActual(@NotNull Subscriber<? super Mqtt5Publish> subscriber) {
        if (this.clientConfig.getState().isConnectedOrReconnect()) {
            ClientComponent clientComponent = this.clientConfig.getClientComponent();
            MqttIncomingQosHandler incomingQosHandler = clientComponent.incomingQosHandler();
            MqttSubscriptionHandler subscriptionHandler = clientComponent.subscriptionHandler();
            MqttSubscribedPublishFlow flow = new MqttSubscribedPublishFlow(subscriber, this.clientConfig, incomingQosHandler);
            subscriber.onSubscribe((Subscription)flow);
            subscriptionHandler.subscribe(this.subscribe, flow);
        } else {
            EmptySubscription.error((Throwable)MqttClientStateExceptions.notConnected(), subscriber);
        }
    }

    @Override
    protected void subscribeBothActual(@NotNull WithSingleSubscriber<? super Mqtt5Publish, ? super Mqtt5SubAck> subscriber) {
        this.subscribeActual(subscriber);
    }
}

