package org.apache.sentry.provider.db.service.persistent;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jdo.JDODataStoreException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.sentry.api.common.SentryServiceUtil;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
import org.apache.sentry.core.common.utils.PubSub;
import org.apache.sentry.service.thrift.HMSFollowerState;
import org.apache.sentry.service.thrift.HiveConnectionFactory;
import org.apache.sentry.service.thrift.HiveNotificationFetcher;
import org.apache.sentry.service.thrift.SentryHMSClient;
import org.apache.sentry.service.thrift.SentryServiceState;
import org.apache.sentry.service.thrift.SentryStateBank;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sentry/provider/db/service/persistent/HMSFollower.class */
public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
    private static final String FULL_UPDATE_TRIGGER = "FULL UPDATE TRIGGER: ";
    private SentryHMSClient client;
    private final Configuration authzConf;
    private final SentryStoreInterface sentryStore;
    private final NotificationProcessor notificationProcessor;
    private boolean readyToServe;
    private final HiveNotificationFetcher notificationFetcher;
    private final boolean hdfsSyncEnabled;
    private final AtomicBoolean fullUpdateHMS;
    private final LeaderStatusMonitor leaderMonitor;
    private int sentryHMSFetchSize;
    private long hmsImageId;
    private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
    private static boolean connectedToHms = false;

    public HMSFollower(Configuration configuration, SentryStoreInterface sentryStoreInterface, LeaderStatusMonitor leaderStatusMonitor, HiveConnectionFactory hiveConnectionFactory) {
        this(configuration, sentryStoreInterface, leaderStatusMonitor, hiveConnectionFactory, null);
    }

    @VisibleForTesting
    public HMSFollower(Configuration configuration, SentryStoreInterface sentryStoreInterface, LeaderStatusMonitor leaderStatusMonitor, HiveConnectionFactory hiveConnectionFactory, String str) {
        this.fullUpdateHMS = new AtomicBoolean(false);
        this.hmsImageId = 0L;
        LOGGER.info("HMSFollower is being initialized");
        this.readyToServe = false;
        this.authzConf = configuration;
        this.leaderMonitor = leaderStatusMonitor;
        this.sentryStore = sentryStoreInterface;
        this.notificationProcessor = new NotificationProcessor(this.sentryStore, str == null ? configuration.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar(), configuration.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED.getVar(), HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED.getDefault())) : str, this.authzConf);
        this.client = new SentryHMSClient(this.authzConf, hiveConnectionFactory);
        this.hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(this.authzConf);
        this.notificationFetcher = new HiveNotificationFetcher(this.sentryStore, hiveConnectionFactory);
        if (configuration.getBoolean("sentry.hdfs.sync.full-update-pubsub", false)) {
            LOGGER.info("FULL UPDATE TRIGGER: subscribing to topic " + PubSub.Topic.HDFS_SYNC_HMS.getName());
            PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this);
        }
        this.sentryHMSFetchSize = configuration.getInt("sentry.hms.fetch.size", -1);
        if (this.sentryHMSFetchSize < 0) {
            LOGGER.info("Sentry will fetch from HMS max depth");
        } else {
            LOGGER.info("Sentry will fetch from HMS with depth of {}", Integer.valueOf(this.sentryHMSFetchSize));
        }
        if (this.hdfsSyncEnabled) {
            return;
        }
        try {
            sentryStoreInterface.clearHmsPathInformation();
        } catch (Exception e) {
            LOGGER.error("Failed to clear HMS path info", e);
            LOGGER.error("Please manually clear data from SENTRY_PATH_CHANGE/AUTHZ_PATH/AUTHZ_PATHS_MAPPING tables.If not, HDFS ACL's will be inconsistent when HDFS sync feature is enabled back.");
        }
    }

    @VisibleForTesting
    public static boolean isConnectedToHms() {
        return connectedToHms;
    }

    @VisibleForTesting
    void setSentryHmsClient(SentryHMSClient sentryHMSClient) {
        this.client = sentryHMSClient;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.client != null) {
            try {
                this.client.disconnect();
                SentryStateBank.disableState(HMSFollowerState.COMPONENT, HMSFollowerState.CONNECTED);
            } catch (Exception e) {
                LOGGER.error("Failed to close the Sentry Hms Client", e);
            }
        }
        this.notificationFetcher.close();
    }

    @Override // java.lang.Runnable
    public void run() {
        SentryStateBank.enableState(HMSFollowerState.COMPONENT, HMSFollowerState.STARTED);
        try {
            try {
                long longValue = this.sentryStore.getLastProcessedNotificationID().longValue();
                wakeUpWaitingClientsForSync(longValue);
                if (isLeader()) {
                    syncupWithHms(longValue);
                    SentryStateBank.disableState(HMSFollowerState.COMPONENT, HMSFollowerState.STARTED);
                } else {
                    close();
                    SentryStateBank.disableState(HMSFollowerState.COMPONENT, HMSFollowerState.STARTED);
                }
            } catch (Exception e) {
                LOGGER.error("Failed to get the last processed notification id from sentry store, Skipping the processing", e);
                SentryStateBank.disableState(HMSFollowerState.COMPONENT, HMSFollowerState.STARTED);
            }
        } catch (Throwable th) {
            SentryStateBank.disableState(HMSFollowerState.COMPONENT, HMSFollowerState.STARTED);
            throw th;
        }
    }

    private boolean isLeader() {
        return this.leaderMonitor == null || this.leaderMonitor.isLeader();
    }

    @VisibleForTesting
    String getAuthServerName() {
        return this.notificationProcessor.getAuthServerName();
    }

    void syncupWithHms(long j) {
        try {
            this.client.connect();
            connectedToHms = true;
            SentryStateBank.enableState(HMSFollowerState.COMPONENT, HMSFollowerState.CONNECTED);
            try {
                if (this.hdfsSyncEnabled) {
                    if (isFullSnapshotRequired(j)) {
                        createFullSnapshot();
                        return;
                    }
                } else if (isSentryOutOfSync(j)) {
                    this.sentryStore.setLastProcessedNotificationID(0L);
                    j = 0;
                }
                List<NotificationEvent> fetchNotifications = this.sentryHMSFetchSize < 0 ? this.notificationFetcher.fetchNotifications(j) : this.notificationFetcher.fetchNotifications(j, this.sentryHMSFetchSize);
                if (this.hdfsSyncEnabled && areNotificationsOutOfSync(fetchNotifications, j)) {
                    createFullSnapshot();
                    return;
                }
                if (!this.readyToServe) {
                    System.out.println("Sentry HMS support is ready");
                    this.readyToServe = true;
                }
                processNotifications(fetchNotifications);
            } catch (TException e) {
                LOGGER.error("An error occurred while fetching HMS notifications: ", e);
                close();
            } catch (Throwable th) {
                LOGGER.error("Exception in HMSFollower! Caused by: " + th.getMessage(), th);
                close();
            }
        } catch (Throwable th2) {
            LOGGER.error("HMSFollower cannot connect to HMS!!", th2);
        }
    }

    private boolean isFullSnapshotRequired(long j) throws Exception {
        if (this.sentryStore.isHmsNotificationEmpty()) {
            String format = String.format("Sentry Store has no HMS Notifications. Create Full HMS Snapshot. latest sentry notification Id = %d", Long.valueOf(j));
            LOGGER.debug(format);
            System.out.println(SentryServiceUtil.getCurrentTimeStampWithMessage(format));
            return true;
        }
        if (this.sentryStore.isAuthzPathsSnapshotEmpty()) {
            String format2 = String.format("HDFSSync is enabled and MAuthzPathsMapping table is empty. Need to request a full snapshot", Long.valueOf(j));
            LOGGER.debug(format2);
            System.out.println(SentryServiceUtil.getCurrentTimeStampWithMessage(format2));
            return true;
        }
        if (isSentryOutOfSync(j)) {
            return true;
        }
        if (!this.fullUpdateHMS.compareAndSet(true, false)) {
            return false;
        }
        LOGGER.info("FULL UPDATE TRIGGER: initiating full HMS snapshot request");
        System.out.println(SentryServiceUtil.getCurrentTimeStampWithMessage("FULL UPDATE TRIGGER: initiating full HMS snapshot request"));
        return true;
    }

    private boolean isSentryOutOfSync(long j) throws Exception {
        long currentNotificationId = this.notificationFetcher.getCurrentNotificationId();
        if (currentNotificationId >= j) {
            return false;
        }
        LOGGER.info("The current notification ID on HMS = {} is less than the latest processed Sentry notification ID = {}. Sentry, Out-of-sync", Long.valueOf(currentNotificationId), Long.valueOf(j));
        return true;
    }

    private boolean areNotificationsOutOfSync(Collection<NotificationEvent> collection, long j) {
        if (collection.isEmpty()) {
            return false;
        }
        long eventId = ((NotificationEvent) ((List) collection).get(0)).getEventId();
        if (eventId <= j + 1) {
            return false;
        }
        String format = String.format("First HMS event notification Id = %d is greater than latest Sentry processednotification Id = %d + 1. Need to request a full HMS snapshot.", Long.valueOf(eventId), Long.valueOf(j));
        LOGGER.info(format);
        System.out.println(SentryServiceUtil.getCurrentTimeStampWithMessage(format));
        return true;
    }

    /* JADX WARN: Finally extract failed */
    private long createFullSnapshot() throws Exception {
        LOGGER.debug("Attempting to take full HMS snapshot");
        Preconditions.checkState(!SentryStateBank.isEnabled(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING), "HMSFollower shown loading full snapshot when it should not be.");
        try {
            try {
                SentryStateBank.enableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
                PathsImage fullSnapshot = this.client.getFullSnapshot();
                if (fullSnapshot.getPathImage().isEmpty()) {
                    LOGGER.debug("Received empty path image from HMS while taking a full snapshot");
                    long id = fullSnapshot.getId();
                    SentryStateBank.disableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
                    return id;
                }
                if (!isLeader()) {
                    LOGGER.info("Not persisting full snapshot since not a leader");
                    SentryStateBank.disableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
                    return 0L;
                }
                try {
                    if (this.hdfsSyncEnabled) {
                        String format = String.format("Persisting full snapshot for notification Id = %d", Long.valueOf(fullSnapshot.getId()));
                        LOGGER.info(format);
                        System.out.println(SentryServiceUtil.getCurrentTimeStampWithMessage(format));
                        this.sentryStore.persistFullPathsImage(fullSnapshot.getPathImage(), fullSnapshot.getId());
                    } else {
                        LOGGER.info("HDFSSync is disabled. Not Persisting full snapshot, but only setting last processed notification Id = {}", Long.valueOf(fullSnapshot.getId()));
                        this.sentryStore.setLastProcessedNotificationID(Long.valueOf(fullSnapshot.getId()));
                    }
                    wakeUpWaitingClientsForSync(fullSnapshot.getId());
                    String format2 = String.format("Create full snapshot process is complete: snapshot Id %d", Long.valueOf(fullSnapshot.getId()));
                    LOGGER.info(format2);
                    System.out.println(SentryServiceUtil.getCurrentTimeStampWithMessage(format2));
                    long id2 = fullSnapshot.getId();
                    SentryStateBank.disableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
                    return id2;
                } catch (Exception e) {
                    LOGGER.error("Received exception while persisting HMS path full snapshot ");
                    throw e;
                }
            } catch (Exception e2) {
                LOGGER.error("Received exception while creating HMS path full snapshot ");
                throw e2;
            }
        } catch (Throwable th) {
            SentryStateBank.disableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
            throw th;
        }
    }

    public void processNotifications(Collection<NotificationEvent> collection) throws Exception {
        if (collection.isEmpty()) {
            return;
        }
        for (NotificationEvent notificationEvent : collection) {
            boolean z = false;
            try {
            } catch (Exception e) {
                if (e.getCause() instanceof JDODataStoreException) {
                    LOGGER.info("Received JDO Storage Exception, Could be because of processing duplicate notification");
                    if (notificationEvent.getEventId() <= this.sentryStore.getLastProcessedNotificationID().longValue()) {
                        LOGGER.error("Received event with Id: {} which is smaller then the ID persisted in store", Long.valueOf(notificationEvent.getEventId()));
                        return;
                    }
                } else {
                    LOGGER.error("Processing the notification with ID:{} failed with exception {}", Long.valueOf(notificationEvent.getEventId()), e);
                }
            }
            if (!isLeader()) {
                LOGGER.debug("Not processing notifications since not a leader");
                return;
            }
            z = this.notificationProcessor.processNotificationEvent(notificationEvent);
            if (!z) {
                try {
                    LOGGER.debug("Explicitly Persisting Notification ID = {} ", Long.valueOf(notificationEvent.getEventId()));
                    this.sentryStore.persistLastProcessedNotificationID(Long.valueOf(notificationEvent.getEventId()));
                } catch (Exception e2) {
                    LOGGER.error("Received exception while persisting the notification ID = {}", Long.valueOf(notificationEvent.getEventId()));
                    throw e2;
                }
            }
            wakeUpWaitingClientsForSync(notificationEvent.getEventId());
        }
    }

    private void wakeUpWaitingClientsForSync(long j) {
        CounterWait counterWait = this.sentryStore.getCounterWait();
        LOGGER.debug("wakeUpWaitingClientsForSync: eventId = {}, hmsImageId = {}", Long.valueOf(j), Long.valueOf(this.hmsImageId));
        if (counterWait == null) {
            return;
        }
        long j2 = this.hmsImageId;
        try {
            long lastProcessedImageID = this.sentryStore.getLastProcessedImageID();
            LOGGER.debug("wakeUpWaitingClientsForSync: lastHMSSnapshotId = {}", Long.valueOf(lastProcessedImageID));
            if (lastProcessedImageID > this.hmsImageId) {
                counterWait.reset(j);
                this.hmsImageId = lastProcessedImageID;
                LOGGER.debug("wakeUpWaitingClientsForSync: reset counterWait with eventId = {}, new hmsImageId = {}", Long.valueOf(j), Long.valueOf(this.hmsImageId));
            }
            LOGGER.debug("wakeUpWaitingClientsForSync: update counterWait with eventId = {}, hmsImageId = {}", Long.valueOf(j), Long.valueOf(this.hmsImageId));
            counterWait.update(j);
        } catch (Exception e) {
            counterWait.update(j);
            LOGGER.error("Failed to get the last processed HMS image id from sentry store");
        }
    }

    public void onMessage(PubSub.Topic topic, String str) {
        Preconditions.checkArgument(topic == PubSub.Topic.HDFS_SYNC_HMS, "Unexpected topic %s instead of %s", new Object[]{topic, PubSub.Topic.HDFS_SYNC_HMS});
        LOGGER.info("FULL UPDATE TRIGGER: Received [{}, {}] notification", topic, str);
        this.fullUpdateHMS.set(true);
    }
}
