package org.apache.sentry.service.thrift;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jdo.JDODataStoreException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
import org.apache.sentry.core.common.utils.PubSub;
import org.apache.sentry.provider.db.service.persistent.PathsImage;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sentry/service/thrift/HMSFollower.class */
public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
    private static final String FULL_UPDATE_TRIGGER = "FULL UPDATE TRIGGER: ";
    private SentryHMSClient client;
    private final Configuration authzConf;
    private final SentryStore sentryStore;
    private final NotificationProcessor notificationProcessor;
    private boolean readyToServe;
    private final HiveNotificationFetcher notificationFetcher;
    private final boolean hdfsSyncEnabled;
    private final AtomicBoolean fullUpdateHMS;
    private final LeaderStatusMonitor leaderMonitor;
    private long hmsImageId;
    private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
    private static boolean connectedToHms = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HMSFollower(Configuration configuration, SentryStore sentryStore, LeaderStatusMonitor leaderStatusMonitor, HiveConnectionFactory hiveConnectionFactory) {
        this(configuration, sentryStore, leaderStatusMonitor, hiveConnectionFactory, null);
    }

    @VisibleForTesting
    public HMSFollower(Configuration configuration, SentryStore sentryStore, LeaderStatusMonitor leaderStatusMonitor, HiveConnectionFactory hiveConnectionFactory, String str) {
        this.fullUpdateHMS = new AtomicBoolean(false);
        this.hmsImageId = 0L;
        LOGGER.info("HMSFollower is being initialized");
        this.readyToServe = false;
        this.authzConf = configuration;
        this.leaderMonitor = leaderStatusMonitor;
        this.sentryStore = sentryStore;
        this.notificationProcessor = new NotificationProcessor(this.sentryStore, str == null ? configuration.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar(), configuration.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED.getVar(), HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED.getDefault())) : str, this.authzConf);
        this.client = new SentryHMSClient(this.authzConf, hiveConnectionFactory);
        this.hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(this.authzConf);
        this.notificationFetcher = new HiveNotificationFetcher(this.sentryStore, hiveConnectionFactory);
        if (configuration.getBoolean("sentry.hdfs.sync.full-update-pubsub", false)) {
            LOGGER.info("FULL UPDATE TRIGGER: subscribing to topic " + PubSub.Topic.HDFS_SYNC_HMS.getName());
            PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this);
        }
    }

    @VisibleForTesting
    public static boolean isConnectedToHms() {
        return connectedToHms;
    }

    @VisibleForTesting
    void setSentryHmsClient(SentryHMSClient sentryHMSClient) {
        this.client = sentryHMSClient;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.client != null) {
            try {
                this.client.disconnect();
                SentryStateBank.disableState(HMSFollowerState.COMPONENT, HMSFollowerState.CONNECTED);
            } catch (Exception e) {
                LOGGER.error("Failed to close the Sentry Hms Client", e);
            }
        }
        this.notificationFetcher.close();
    }

    @Override // java.lang.Runnable
    public void run() {
        SentryStateBank.enableState(HMSFollowerState.COMPONENT, HMSFollowerState.STARTED);
        try {
            try {
                long longValue = this.sentryStore.getLastProcessedNotificationID().longValue();
                wakeUpWaitingClientsForSync(longValue);
                if (isLeader()) {
                    syncupWithHms(longValue);
                    SentryStateBank.disableState(HMSFollowerState.COMPONENT, HMSFollowerState.STARTED);
                } else {
                    close();
                    SentryStateBank.disableState(HMSFollowerState.COMPONENT, HMSFollowerState.STARTED);
                }
            } catch (Exception e) {
                LOGGER.error("Failed to get the last processed notification id from sentry store, Skipping the processing", e);
                SentryStateBank.disableState(HMSFollowerState.COMPONENT, HMSFollowerState.STARTED);
            }
        } catch (Throwable th) {
            SentryStateBank.disableState(HMSFollowerState.COMPONENT, HMSFollowerState.STARTED);
            throw th;
        }
    }

    private boolean isLeader() {
        return this.leaderMonitor == null || this.leaderMonitor.isLeader();
    }

    @VisibleForTesting
    String getAuthServerName() {
        return this.notificationProcessor.getAuthServerName();
    }

    private void syncupWithHms(long j) {
        try {
            this.client.connect();
            connectedToHms = true;
            SentryStateBank.enableState(HMSFollowerState.COMPONENT, HMSFollowerState.CONNECTED);
            try {
                if (isFullSnapshotRequired(j)) {
                    createFullSnapshot();
                    return;
                }
                List<NotificationEvent> fetchNotifications = this.notificationFetcher.fetchNotifications(j);
                if (areNotificationsOutOfSync(fetchNotifications, j)) {
                    createFullSnapshot();
                    return;
                }
                if (!this.readyToServe) {
                    System.out.println("Sentry HMS support is ready");
                    this.readyToServe = true;
                }
                processNotifications(fetchNotifications);
            } catch (TException e) {
                LOGGER.error("An error occurred while fetching HMS notifications: {}", e.getMessage());
                close();
            } catch (Throwable th) {
                LOGGER.error("Exception in HMSFollower! Caused by: " + th.getMessage(), th);
                close();
            }
        } catch (Throwable th2) {
            LOGGER.error("HMSFollower cannot connect to HMS!!", th2);
        }
    }

    private boolean isFullSnapshotRequired(long j) throws Exception {
        if (this.sentryStore.isHmsNotificationEmpty()) {
            return true;
        }
        if (this.hdfsSyncEnabled && this.sentryStore.isAuthzPathsSnapshotEmpty()) {
            LOGGER.debug("HDFSSync is enabled and MAuthzPathsSnapshotId table is empty. Need to request a full snapshot");
            return true;
        }
        if (this.notificationFetcher.getCurrentNotificationId() < j) {
            LOGGER.info("The latest notification ID on HMS is less than the latest notification ID processed by Sentry. Need to request a full HMS snapshot.");
            return true;
        }
        if (!this.fullUpdateHMS.compareAndSet(true, false)) {
            return false;
        }
        LOGGER.info("FULL UPDATE TRIGGER: initiating full HMS snapshot request");
        return true;
    }

    private boolean areNotificationsOutOfSync(Collection<NotificationEvent> collection, long j) {
        if (collection.isEmpty() || ((NotificationEvent) ((List) collection).get(0)).getEventId() <= j + 1) {
            return false;
        }
        LOGGER.info("Current HMS notifications are out-of-sync with latest Sentry processednotifications. Need to request a full HMS snapshot.");
        return true;
    }

    private long createFullSnapshot() throws Exception {
        LOGGER.debug("Attempting to take full HMS snapshot");
        Preconditions.checkState(!SentryStateBank.isEnabled(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING), "HMSFollower shown loading full snapshot when it should not be.");
        try {
            SentryStateBank.enableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
            PathsImage fullSnapshot = this.client.getFullSnapshot();
            if (fullSnapshot.getPathImage().isEmpty()) {
                long id = fullSnapshot.getId();
                SentryStateBank.disableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
                return id;
            }
            if (!isLeader()) {
                return 0L;
            }
            try {
                LOGGER.debug("Persisting HMS path full snapshot");
                if (this.hdfsSyncEnabled) {
                    this.sentryStore.persistFullPathsImage(fullSnapshot.getPathImage(), fullSnapshot.getId());
                } else {
                    this.sentryStore.setLastProcessedNotificationID(Long.valueOf(fullSnapshot.getId()));
                }
                wakeUpWaitingClientsForSync(fullSnapshot.getId());
                LOGGER.info("Sentry HMS support is ready");
                long id2 = fullSnapshot.getId();
                SentryStateBank.disableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
                return id2;
            } catch (Exception e) {
                LOGGER.error("Received exception while persisting HMS path full snapshot ");
                throw e;
            }
        } finally {
            SentryStateBank.disableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
        }
    }

    public void processNotifications(Collection<NotificationEvent> collection) throws Exception {
        if (collection.isEmpty()) {
            return;
        }
        for (NotificationEvent notificationEvent : collection) {
            boolean z = false;
            try {
            } catch (Exception e) {
                if (e.getCause() instanceof JDODataStoreException) {
                    LOGGER.info("Received JDO Storage Exception, Could be because of processing duplicate notification");
                    if (notificationEvent.getEventId() <= this.sentryStore.getLastProcessedNotificationID().longValue()) {
                        LOGGER.error("Received event with Id: {} which is smaller then the ID persisted in store", Long.valueOf(notificationEvent.getEventId()));
                        return;
                    }
                } else {
                    LOGGER.error("Processing the notification with ID:{} failed with exception {}", Long.valueOf(notificationEvent.getEventId()), e);
                }
            }
            if (!isLeader()) {
                return;
            }
            z = this.notificationProcessor.processNotificationEvent(notificationEvent);
            if (!z) {
                try {
                    LOGGER.debug("Explicitly Persisting Notification ID:{}", Long.valueOf(notificationEvent.getEventId()));
                    this.sentryStore.persistLastProcessedNotificationID(Long.valueOf(notificationEvent.getEventId()));
                } catch (Exception e2) {
                    LOGGER.error("Received exception while persisting the notification ID " + notificationEvent.getEventId());
                    throw e2;
                }
            }
            wakeUpWaitingClientsForSync(notificationEvent.getEventId());
        }
    }

    private void wakeUpWaitingClientsForSync(long j) {
        CounterWait counterWait = this.sentryStore.getCounterWait();
        if (counterWait == null) {
            return;
        }
        long j2 = this.hmsImageId;
        try {
            long lastProcessedImageID = this.sentryStore.getLastProcessedImageID();
            if (lastProcessedImageID > this.hmsImageId) {
                counterWait.reset(j);
                this.hmsImageId = lastProcessedImageID;
            }
            counterWait.update(j);
        } catch (Exception e) {
            counterWait.update(j);
            LOGGER.error("Failed to get the last processed HMS image id from sentry store");
        }
    }

    public void onMessage(PubSub.Topic topic, String str) {
        Preconditions.checkArgument(topic == PubSub.Topic.HDFS_SYNC_HMS, "Unexpected topic %s instead of %s", new Object[]{topic, PubSub.Topic.HDFS_SYNC_HMS});
        LOGGER.info("FULL UPDATE TRIGGER: Received [{}, {}] notification", topic, str);
        this.fullUpdateHMS.set(true);
    }
}
