package smartrics.iotics.space.twins;

import com.iotics.api.FeedID;
import com.iotics.api.FetchInterestResponse;
import com.iotics.api.SearchRequest;
import com.iotics.api.SearchResponse;
import com.iotics.api.TwinID;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import smartrics.iotics.space.grpc.FeedDataBag;
import smartrics.iotics.space.grpc.IoticsApi;
import smartrics.iotics.space.grpc.NoopStreamObserver;
import smartrics.iotics.space.grpc.TwinDataBag;
import smartrics.iotics.space.twins.Follower;

/* loaded from: input_file:smartrics/iotics/space/twins/FindAndBindTwin.class */
public class FindAndBindTwin extends FindAndDoTwin implements Follower {
    private static final Logger LOGGER = LoggerFactory.getLogger(FindAndBindTwin.class);
    private final Map<FeedID, CompletableFuture<Void>> followFutures;
    private final AtomicLong feedsFollowed;
    private final AtomicLong datapointReceived;
    protected final Follower.RetryConf retryConf;

    public FindAndBindTwin(IoticsApi ioticsApi, String str, String str2, Executor executor, TwinID twinID, Timer timer, Duration duration, Follower.RetryConf retryConf) {
        super(ioticsApi, str, str2, executor, twinID, timer, duration);
        this.feedsFollowed = new AtomicLong(0L);
        this.datapointReceived = new AtomicLong(0L);
        this.followFutures = new ConcurrentHashMap();
        this.retryConf = retryConf;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // smartrics.iotics.space.twins.FindAndDoTwin
    public Map<String, Object> getShareStatus() {
        Map<String, Object> shareStatus = super.getShareStatus();
        shareStatus.put(FindAndDoTwin.FOLLOWING_FEEDS, Long.valueOf(this.feedsFollowed.get()));
        shareStatus.put(FindAndDoTwin.RECEIVED_DATA_POINTS, Long.valueOf(this.datapointReceived.get()));
        return shareStatus;
    }

    public CompletableFuture<Void> findAndBind(SearchRequest.Payload payload, StreamObserver<FeedDataBag> streamObserver) {
        return findAndBind(payload, new NoopStreamObserver(), streamObserver);
    }

    public CompletableFuture<Void> findAndBind(SearchRequest.Payload payload, final StreamObserver<TwinDataBag> streamObserver, final StreamObserver<FeedDataBag> streamObserver2) {
        try {
            updateMeta(payload);
            final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            search(payload, new StreamObserver<SearchResponse.TwinDetails>() { // from class: smartrics.iotics.space.twins.FindAndBindTwin.1
                public void onNext(SearchResponse.TwinDetails twinDetails) {
                    try {
                        final TwinDataBag from = TwinDataBag.from(twinDetails);
                        streamObserver.onNext(from);
                        FindAndBindTwin.this.twinsFound.incrementAndGet();
                        FindAndBindTwin.this.lastUpdateMs.set(System.currentTimeMillis());
                        for (final SearchResponse.FeedDetails feedDetails : twinDetails.getFeedsList()) {
                            FeedID feedId = feedDetails.getFeedId();
                            FindAndBindTwin.LOGGER.debug("[{}][{}] about to follow {}/{} in host {}", new Object[]{FindAndBindTwin.this.keyName, FindAndBindTwin.this.label, feedId.getTwinId(), feedId.getId(), feedId.getHostId()});
                            FindAndBindTwin.this.follow(feedId, FindAndBindTwin.this.retryConf, new StreamObserver<FetchInterestResponse>() { // from class: smartrics.iotics.space.twins.FindAndBindTwin.1.1
                                public void onNext(FetchInterestResponse fetchInterestResponse) {
                                    try {
                                        FindAndBindTwin.this.datapointReceived.incrementAndGet();
                                        FindAndBindTwin.this.lastUpdateMs.set(System.currentTimeMillis());
                                        streamObserver2.onNext(new FeedDataBag(from, feedDetails, fetchInterestResponse));
                                    } catch (RuntimeException e) {
                                        FindAndBindTwin.LOGGER.debug("[{}][{}] exception processing next feed data", new Object[]{FindAndBindTwin.this.keyName, FindAndBindTwin.this.label, e});
                                        throw e;
                                    }
                                }

                                public void onError(Throwable th) {
                                    FindAndBindTwin.this.errorsCount.incrementAndGet();
                                    FindAndBindTwin.this.feedsFollowed.decrementAndGet();
                                    FindAndBindTwin.this.lastUpdateMs.set(System.currentTimeMillis());
                                    streamObserver2.onError(th);
                                }

                                public void onCompleted() {
                                    FindAndBindTwin.this.feedsFollowed.decrementAndGet();
                                    FindAndBindTwin.this.lastUpdateMs.set(System.currentTimeMillis());
                                    streamObserver2.onCompleted();
                                }
                            });
                            FindAndBindTwin.this.feedsFollowed.incrementAndGet();
                            FindAndBindTwin.this.lastUpdateMs.set(System.currentTimeMillis());
                            FindAndBindTwin.this.followFutures.put(feedDetails.getFeedId(), new CompletableFuture<>());
                        }
                    } catch (RuntimeException e) {
                        FindAndBindTwin.LOGGER.debug("[{}][{}] exception processing next search response", new Object[]{FindAndBindTwin.this.keyName, FindAndBindTwin.this.label, e});
                        throw e;
                    }
                }

                public void onError(Throwable th) {
                    FindAndBindTwin.this.errorsCount.incrementAndGet();
                    streamObserver.onError(th);
                }

                public void onCompleted() {
                    FindAndBindTwin.this.followFutures.values().forEach(completableFuture2 -> {
                        completableFuture2.cancel(true);
                    });
                    completableFuture.complete(null);
                    FindAndBindTwin.this.followFutures.clear();
                }
            });
            return completableFuture;
        } catch (Exception e) {
            this.errorsCount.incrementAndGet();
            streamObserver.onError(e);
            CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
            completableFuture2.complete(null);
            return completableFuture2;
        }
    }
}
