/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import io.mantisrx.publish.StreamManager;
import io.mantisrx.publish.SubscriptionTracker;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.core.Subscription;
import io.mantisrx.publish.core.SubscriptionFactory;
import io.mantisrx.publish.internal.discovery.MantisJobDiscovery;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.proto.MantisServerSubscription;
import io.mantisrx.publish.proto.MantisServerSubscriptionEnvelope;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSubscriptionTracker
implements SubscriptionTracker {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscriptionTracker.class);
    private final MrePublishConfiguration mrePublishConfiguration;
    private final Registry registry;
    private final MantisJobDiscovery jobDiscovery;
    private final StreamManager streamManager;
    private final Counter refreshSubscriptionInvokedCount;
    private final Counter refreshSubscriptionSuccessCount;
    private final Counter refreshSubscriptionFailedCount;
    private final Counter staleSubscriptionRemovedCount;
    private ConcurrentHashMap<String, SubscriptionCacheEntry> subsciptionCache = new ConcurrentHashMap();

    public AbstractSubscriptionTracker(MrePublishConfiguration mrePublishConfiguration, Registry registry, MantisJobDiscovery jobDiscovery, StreamManager streamManager) {
        this.mrePublishConfiguration = mrePublishConfiguration;
        this.registry = registry;
        this.jobDiscovery = jobDiscovery;
        this.streamManager = streamManager;
        this.refreshSubscriptionInvokedCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionInvokedCount");
        this.refreshSubscriptionSuccessCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionSuccessCount");
        this.refreshSubscriptionFailedCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionFailedCount");
        this.staleSubscriptionRemovedCount = SpectatorUtils.buildAndRegisterCounter(registry, "staleSubscriptionRemovedCount");
    }

    void propagateSubscriptionChanges(Set<MantisServerSubscription> currentSubscriptions, Set<MantisServerSubscription> extension) {
        Set<Subscription> previousSubscriptions = this.getCurrentSubscriptions();
        currentSubscriptions.stream().filter(c -> !previousSubscriptions.stream().map(ps -> ps.getSubscriptionId()).collect(Collectors.toSet()).contains(c.getSubscriptionId())).forEach(newSub -> {
            try {
                Optional<Subscription> subscription2 = SubscriptionFactory.getSubscription(newSub.getSubscriptionId(), newSub.getQuery());
                if (subscription2.isPresent()) {
                    this.streamManager.addStreamSubscription(subscription2.get());
                } else {
                    LOG.info("will not add invalid subscription {}", newSub);
                }
            }
            catch (Throwable t) {
                LOG.debug("failed to add subscription {}", newSub, (Object)t);
            }
        });
        Set idsToKeep = currentSubscriptions.stream().map(x -> x.getSubscriptionId()).collect(Collectors.toSet());
        idsToKeep.addAll(extension.stream().map(x -> x.getSubscriptionId()).collect(Collectors.toSet()));
        previousSubscriptions.stream().filter(o -> !idsToKeep.contains(o.getSubscriptionId())).forEach(o -> {
            try {
                this.streamManager.removeStreamSubscription(o.getSubscriptionId());
            }
            catch (Throwable t) {
                LOG.debug("failed to remove subscription {}", (Object)o.getSubscriptionId());
            }
        });
    }

    public abstract Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions(String var1);

    private Set<String> getRelevantJobClusters(Map<String, String> streamJobClusterMap, Set<String> registeredStreams) {
        HashSet<String> jobClustersToFetch = new HashSet<String>();
        for (Map.Entry<String, String> e2 : streamJobClusterMap.entrySet()) {
            String streamName = e2.getKey();
            LOG.debug("processing stream {} and currently registered Streams {}", (Object)streamName, (Object)registeredStreams);
            if (registeredStreams.contains(streamName) || "__default__".equals(streamName)) {
                jobClustersToFetch.add(e2.getValue());
                continue;
            }
            LOG.debug("No server side mappings found for one or more streams {} ", (Object)registeredStreams);
            LOG.debug("will not fetch subscriptions for un-registered stream {}", (Object)streamName);
        }
        return jobClustersToFetch;
    }

    @Override
    public void refreshSubscriptions() {
        this.refreshSubscriptionInvokedCount.increment();
        boolean mantisPublishEnabled = this.mrePublishConfiguration.isMREClientEnabled();
        Set<String> registeredStreams = this.streamManager.getRegisteredStreams();
        if (mantisPublishEnabled && !registeredStreams.isEmpty()) {
            Map<String, String> streamJobClusterMap = this.jobDiscovery.getStreamNameToJobClusterMapping(this.mrePublishConfiguration.appName());
            Set<String> jobClustersToFetch = this.getRelevantJobClusters(streamJobClusterMap, registeredStreams);
            HashSet<MantisServerSubscription> allSubscriptions = new HashSet<MantisServerSubscription>();
            HashSet<String> failedJobClusters = new HashSet<String>();
            long currentTimestamp = System.currentTimeMillis();
            for (String jobCluster : jobClustersToFetch) {
                try {
                    Optional<MantisServerSubscriptionEnvelope> subsEnvelopeO = this.fetchSubscriptions(jobCluster);
                    if (subsEnvelopeO.isPresent()) {
                        MantisServerSubscriptionEnvelope subsEnvelope = subsEnvelopeO.get();
                        for (MantisServerSubscription sub : subsEnvelope.getSubscriptions()) {
                            this.subsciptionCache.put(sub.getSubscriptionId(), new SubscriptionCacheEntry(currentTimestamp, jobCluster, sub));
                        }
                        allSubscriptions.addAll(subsEnvelope.getSubscriptions());
                        this.refreshSubscriptionSuccessCount.increment();
                        continue;
                    }
                    failedJobClusters.add(jobCluster);
                    this.refreshSubscriptionFailedCount.increment();
                }
                catch (Exception ex) {
                    failedJobClusters.add(jobCluster);
                    LOG.info("refresh subscriptions failed for {}", (Object)jobCluster, (Object)ex);
                    this.refreshSubscriptionFailedCount.increment();
                }
            }
            Set<MantisServerSubscription> subscriptionsToExtend = this.subsciptionCache.entrySet().stream().filter(es -> failedJobClusters.contains(((SubscriptionCacheEntry)es.getValue()).sourceJob)).filter(es -> currentTimestamp - ((SubscriptionCacheEntry)es.getValue()).timestamp < (long)(this.mrePublishConfiguration.subscriptionExpiryIntervalSec() * 1000)).map(es -> ((SubscriptionCacheEntry)es.getValue()).sub).collect(Collectors.toSet());
            this.propagateSubscriptionChanges(allSubscriptions, subscriptionsToExtend);
            this.subsciptionCache.entrySet().stream().filter(es -> currentTimestamp - ((SubscriptionCacheEntry)es.getValue()).timestamp > (long)(this.mrePublishConfiguration.subscriptionExpiryIntervalSec() * 1000 * 10)).forEach(es -> this.subsciptionCache.remove(es.getKey()));
        } else {
            LOG.debug("subscription refresh skipped (client enabled {} registered streams {})", (Object)mantisPublishEnabled, (Object)registeredStreams);
        }
    }

    protected Set<Subscription> getCurrentSubscriptions() {
        return this.streamManager.getRegisteredStreams().stream().flatMap(streamName -> this.streamManager.getStreamSubscriptions((String)streamName).stream()).collect(Collectors.toSet());
    }

    private class SubscriptionCacheEntry {
        public final long timestamp;
        public final String sourceJob;
        public final MantisServerSubscription sub;

        public SubscriptionCacheEntry(long timestamp, String sourceJob, MantisServerSubscription sub) {
            this.timestamp = timestamp;
            this.sourceJob = sourceJob;
            this.sub = sub;
        }
    }
}

