package org.eclipse.ditto.client.twin.internal;

import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.ditto.client.internal.AbstractHandle;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.client.streaming.MapPublisher;
import org.eclipse.ditto.client.streaming.SpliteratorSubscriber;
import org.eclipse.ditto.client.streaming.ThingSearchPublisher;
import org.eclipse.ditto.client.twin.SearchQueryBuilder;
import org.eclipse.ditto.client.twin.TwinSearchHandle;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.model.things.ThingsModelFactory;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.signals.commands.thingsearch.subscription.CreateSubscription;
import org.eclipse.ditto.signals.events.thingsearch.SubscriptionHasNextPage;
import org.reactivestreams.Publisher;

/* loaded from: input_file:org/eclipse/ditto/client/twin/internal/TwinSearchHandleImpl.class */
final class TwinSearchHandleImpl extends AbstractHandle implements TwinSearchHandle {
    /* JADX INFO: Access modifiers changed from: package-private */
    public TwinSearchHandleImpl(MessagingProvider messagingProvider) {
        super(messagingProvider, TopicPath.Channel.TWIN);
    }

    @Override // org.eclipse.ditto.client.twin.TwinSearchHandle
    public Publisher<List<Thing>> publisher(Consumer<SearchQueryBuilder> consumer) {
        SearchQueryBuilderImpl searchQueryBuilderImpl = new SearchQueryBuilderImpl();
        consumer.accept(searchQueryBuilderImpl);
        return MapPublisher.of(ThingSearchPublisher.of(searchQueryBuilderImpl.createSubscription(), PROTOCOL_ADAPTER, this.messagingProvider), TwinSearchHandleImpl::pageToThingList);
    }

    @Override // org.eclipse.ditto.client.twin.TwinSearchHandle
    public Stream<Thing> stream(Consumer<SearchQueryBuilder> consumer) {
        return internalSpliterator(consumer).asStream().flatMap(TwinSearchHandleImpl::streamAsThings);
    }

    private SpliteratorSubscriber<SubscriptionHasNextPage> internalSpliterator(Consumer<SearchQueryBuilder> consumer) {
        SearchQueryBuilderImpl searchQueryBuilderImpl = new SearchQueryBuilderImpl();
        consumer.accept(searchQueryBuilderImpl);
        CreateSubscription createSubscription = searchQueryBuilderImpl.createSubscription();
        Duration timeout = this.messagingProvider.getMessagingConfiguration().getTimeout();
        int initialDemand = searchQueryBuilderImpl.getInitialDemand();
        int demand = searchQueryBuilderImpl.getDemand();
        Publisher<SubscriptionHasNextPage> of = ThingSearchPublisher.of(createSubscription, PROTOCOL_ADAPTER, this.messagingProvider);
        SpliteratorSubscriber<SubscriptionHasNextPage> of2 = SpliteratorSubscriber.of(timeout, initialDemand, demand);
        of.subscribe(of2);
        return of2;
    }

    private static Stream<Thing> streamAsThings(SubscriptionHasNextPage subscriptionHasNextPage) {
        return subscriptionHasNextPage.getItems().stream().map((v0) -> {
            return v0.asObject();
        }).map(ThingsModelFactory::newThing);
    }

    private static List<Thing> pageToThingList(SubscriptionHasNextPage subscriptionHasNextPage) {
        return (List) subscriptionHasNextPage.getItems().stream().map((v0) -> {
            return v0.asObject();
        }).map(ThingsModelFactory::newThing).collect(Collectors.toList());
    }

    @Override // org.eclipse.ditto.client.internal.AbstractHandle
    protected AcknowledgementLabel getThingResponseAcknowledgementLabel() {
        return DittoAcknowledgementLabel.TWIN_PERSISTED;
    }
}
