package org.eclipse.ditto.client.streaming;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.client.internal.bus.AdaptableBus;
import org.eclipse.ditto.client.internal.bus.Classification;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CancelSubscription;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.RequestFromSubscription;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionComplete;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionCreated;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionFailed;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionHasNextPage;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/ditto/client/streaming/ThingSearchSubscription.class */
public final class ThingSearchSubscription implements Subscription {
    private static final Logger LOGGER = LoggerFactory.getLogger(ThingSearchSubscription.class);
    private final String subscriptionId;
    private final ProtocolAdapter protocolAdapter;
    private final MessagingProvider messagingProvider;
    private final Subscriber<? super SubscriptionHasNextPage> subscriber;
    private final AtomicBoolean cancelled = new AtomicBoolean(false);
    private final AtomicReference<AdaptableBus.SubscriptionId> busSubscription = new AtomicReference<>();
    private final ExecutorService singleThreadedExecutorService = Executors.newSingleThreadExecutor();

    /* renamed from: org.eclipse.ditto.client.streaming.ThingSearchSubscription$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/ditto/client/streaming/ThingSearchSubscription$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$ditto$protocol$TopicPath$SearchAction = new int[TopicPath.SearchAction.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$ditto$protocol$TopicPath$SearchAction[TopicPath.SearchAction.COMPLETE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$ditto$protocol$TopicPath$SearchAction[TopicPath.SearchAction.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    private ThingSearchSubscription(String str, ProtocolAdapter protocolAdapter, MessagingProvider messagingProvider, Subscriber<? super SubscriptionHasNextPage> subscriber) {
        this.subscriptionId = str;
        this.protocolAdapter = protocolAdapter;
        this.messagingProvider = messagingProvider;
        this.subscriber = subscriber;
    }

    public static void start(SubscriptionCreated subscriptionCreated, ProtocolAdapter protocolAdapter, MessagingProvider messagingProvider, Subscriber<? super SubscriptionHasNextPage> subscriber) {
        ThingSearchSubscription thingSearchSubscription = new ThingSearchSubscription(subscriptionCreated.getSubscriptionId(), protocolAdapter, messagingProvider, subscriber);
        subscriber.onSubscribe(thingSearchSubscription);
        thingSearchSubscription.startForwarding();
    }

    public void request(long j) {
        this.singleThreadedExecutorService.submit(() -> {
            if (j <= 0) {
                doCancel();
                this.subscriber.onError(new IllegalArgumentException("Expect positive demand, got: " + j));
            } else {
                if (this.cancelled.get()) {
                    return;
                }
                ensureBusSubscription();
                this.messagingProvider.emitAdaptable(this.protocolAdapter.toAdaptable(RequestFromSubscription.of(this.subscriptionId, j, DittoHeaders.newBuilder().randomCorrelationId().build())));
            }
        });
    }

    public void cancel() {
        this.singleThreadedExecutorService.submit(this::doCancel);
    }

    private void doCancel() {
        if (this.cancelled.getAndSet(true)) {
            return;
        }
        cancelBusSubscription();
        sendCancelSubscription();
    }

    private void onTimeout(Throwable th) {
        this.singleThreadedExecutorService.submit(() -> {
            if (this.cancelled.getAndSet(true)) {
                return;
            }
            this.subscriber.onError(th);
        });
    }

    private void onNext(Adaptable adaptable) {
        this.singleThreadedExecutorService.submit(() -> {
            LOGGER.trace("Received from bus: <{}>", adaptable);
            handleAdaptable(adaptable);
        });
    }

    private boolean isTermination(Adaptable adaptable) {
        return adaptable.getTopicPath().getSearchAction().filter(this::isTerminationAction).isPresent();
    }

    private void sendCancelSubscription() {
        this.messagingProvider.emitAdaptable(this.protocolAdapter.toAdaptable(CancelSubscription.of(this.subscriptionId, DittoHeaders.empty())));
    }

    private boolean isTerminationAction(TopicPath.SearchAction searchAction) {
        switch (AnonymousClass1.$SwitchMap$org$eclipse$ditto$protocol$TopicPath$SearchAction[searchAction.ordinal()]) {
            case 1:
            case 2:
                return true;
            default:
                return false;
        }
    }

    private void handleAdaptable(Adaptable adaptable) {
        SubscriptionHasNextPage fromAdaptable = this.protocolAdapter.fromAdaptable(adaptable);
        LOGGER.trace("Notifying subscriber of: <{}>", fromAdaptable);
        if (fromAdaptable instanceof SubscriptionHasNextPage) {
            this.subscriber.onNext(fromAdaptable);
            return;
        }
        if (fromAdaptable instanceof SubscriptionComplete) {
            cancelDueToUpstreamTermination();
            this.subscriber.onComplete();
        } else if (fromAdaptable instanceof SubscriptionFailed) {
            cancelDueToUpstreamTermination();
            this.subscriber.onError(((SubscriptionFailed) fromAdaptable).getError());
        } else {
            doCancel();
            this.subscriber.onError(new ClassCastException("Expect SubscriptionEvent, got " + fromAdaptable));
        }
    }

    private void cancelDueToUpstreamTermination() {
        this.cancelled.set(true);
        cancelBusSubscription();
    }

    private void cancelBusSubscription() {
        AdaptableBus.SubscriptionId subscriptionId = this.busSubscription.get();
        if (subscriptionId != null) {
            this.messagingProvider.getAdaptableBus().unsubscribe(subscriptionId);
        }
    }

    private void startForwarding() {
        LOGGER.trace("Returned from subscriber.onSubscribe()");
        ensureBusSubscription();
    }

    private void ensureBusSubscription() {
        synchronized (this.busSubscription) {
            if (this.busSubscription.get() == null) {
                this.busSubscription.set(this.messagingProvider.getAdaptableBus().subscribeForAdaptableWithTimeout(Classification.forThingsSearch(this.subscriptionId), this.messagingProvider.getMessagingConfiguration().getTimeout(), this::onNext, this::isTermination, this::onTimeout));
            }
        }
    }
}
