/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt.handler.publish.incoming;

import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttGlobalIncomingPublishFlow;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlows;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingQosHandler;
import com.hivemq.client.internal.mqtt.ioc.ClientComponent;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.shaded.org.jetbrains.annotations.NotNull;
import io.reactivex.Flowable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class MqttGlobalIncomingPublishFlowable
extends Flowable<Mqtt5Publish> {
    @NotNull
    private final MqttGlobalPublishFilter filter;
    @NotNull
    private final MqttClientConfig clientConfig;

    public MqttGlobalIncomingPublishFlowable(@NotNull MqttGlobalPublishFilter filter, @NotNull MqttClientConfig clientConfig) {
        this.filter = filter;
        this.clientConfig = clientConfig;
    }

    protected void subscribeActual(@NotNull Subscriber<? super Mqtt5Publish> subscriber) {
        ClientComponent clientComponent = this.clientConfig.getClientComponent();
        MqttIncomingQosHandler incomingQosHandler = clientComponent.incomingQosHandler();
        MqttIncomingPublishFlows incomingPublishFlows = incomingQosHandler.getIncomingPublishFlows();
        MqttGlobalIncomingPublishFlow flow = new MqttGlobalIncomingPublishFlow(subscriber, this.clientConfig, incomingQosHandler, this.filter);
        subscriber.onSubscribe((Subscription)flow);
        flow.getEventLoop().execute(() -> {
            if (flow.init()) {
                incomingPublishFlows.subscribeGlobal(flow);
            }
        });
    }
}

