package alluxio.master.backup;

import alluxio.ClientContext;
import alluxio.ProcessUtils;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.AlluxioException;
import alluxio.exception.BackupException;
import alluxio.grpc.BackupPRequest;
import alluxio.grpc.BackupState;
import alluxio.grpc.BackupStatusPRequest;
import alluxio.grpc.GrpcService;
import alluxio.grpc.ServiceType;
import alluxio.master.CoreMasterContext;
import alluxio.master.MasterClientContext;
import alluxio.master.StateLockOptions;
import alluxio.master.journal.CatchupFuture;
import alluxio.master.transport.GrpcMessagingClient;
import alluxio.master.transport.GrpcMessagingConnection;
import alluxio.master.transport.Listener;
import alluxio.retry.ExponentialBackoffRetry;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.BackupStatus;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/master/backup/BackupWorkerRole.class */
public class BackupWorkerRole extends AbstractBackupRole {
    private static final Logger LOG = LoggerFactory.getLogger(BackupWorkerRole.class);
    private static final long BACKUP_ABORT_AFTER_TRANSITION_TIMEOUT_MS = 30000;
    private final long mLeaderConnectionIntervalMin;
    private final long mLeaderConnectionIntervalMax;
    private final long mBackupHeartbeatIntervalMs;
    private final long mBackupAbortSuspendTimeoutMs;
    private GrpcMessagingConnection mLeaderConnection;
    private Listener<GrpcMessagingConnection> mLeaderConnectionCloseListener;
    private Future<?> mBackupFuture;
    private Future<?> mBackupProgressFuture;
    private ScheduledFuture<?> mBackupTimeoutTask;

    public BackupWorkerRole(CoreMasterContext coreMasterContext) {
        super(coreMasterContext);
        LOG.info("Creating backup-worker role.");
        this.mBackupHeartbeatIntervalMs = ServerConfiguration.getMs(PropertyKey.MASTER_BACKUP_HEARTBEAT_INTERVAL);
        this.mLeaderConnectionIntervalMin = ServerConfiguration.getMs(PropertyKey.MASTER_BACKUP_CONNECT_INTERVAL_MIN);
        this.mLeaderConnectionIntervalMax = ServerConfiguration.getMs(PropertyKey.MASTER_BACKUP_CONNECT_INTERVAL_MAX);
        this.mBackupAbortSuspendTimeoutMs = ServerConfiguration.getMs(PropertyKey.MASTER_BACKUP_SUSPEND_TIMEOUT);
        this.mExecutorService.submit(this::establishConnectionToLeader);
    }

    @Override // alluxio.master.backup.AbstractBackupRole, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mRoleClosed) {
            return;
        }
        LOG.info("Closing backup-worker role.");
        if (this.mBackupTimeoutTask != null && !this.mBackupTimeoutTask.isDone()) {
            this.mBackupTimeoutTask.cancel(true);
        }
        if (this.mBackupFuture != null) {
            this.mBackupFuture.cancel(true);
        }
        if (this.mBackupProgressFuture != null) {
            this.mBackupProgressFuture.cancel(true);
        }
        if (this.mLeaderConnectionCloseListener != null) {
            this.mLeaderConnectionCloseListener.close();
        }
        if (this.mLeaderConnection != null) {
            try {
                this.mLeaderConnection.close().get();
            } catch (Exception e) {
                LOG.warn("Failed to close backup-leader connection: {}. Error: {}", this.mLeaderConnection, e);
            }
            this.mLeaderConnection = null;
        }
        super.close();
    }

    @Override // alluxio.master.backup.BackupRole
    public Map<ServiceType, GrpcService> getRoleServices() {
        return Collections.emptyMap();
    }

    @Override // alluxio.master.backup.BackupOps
    public BackupStatus backup(BackupPRequest backupPRequest, StateLockOptions stateLockOptions) throws AlluxioException {
        throw new IllegalStateException("Backup-worker role can't serve RPCs");
    }

    @Override // alluxio.master.backup.BackupOps
    public BackupStatus getBackupStatus(BackupStatusPRequest backupStatusPRequest) throws AlluxioException {
        throw new IllegalStateException("Backup-worker role can't serve RPCs");
    }

    private CompletableFuture<Void> handleSuspendJournalsMessage(BackupSuspendMessage backupSuspendMessage) {
        LOG.info("Received suspend message: {}", backupSuspendMessage);
        Preconditions.checkState(!this.mBackupTracker.inProgress(), "Backup in progress");
        CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
        try {
            this.mJournalSystem.suspend(this::interruptBackup);
            LOG.info("Suspended journals for backup.");
            this.mBackupTimeoutTask = this.mTaskScheduler.schedule(() -> {
                LOG.info("Resuming journals as backup request hasn't been received.");
                enforceResumeJournals();
            }, this.mBackupAbortSuspendTimeoutMs, TimeUnit.MILLISECONDS);
            return completedFuture;
        } catch (IOException e) {
            LOG.error("Failed to suspended journals for backup.", e);
            throw new RuntimeException("Failed to suspended journals for backup.", e);
        }
    }

    private void interruptBackup() {
        LOG.info("Interrupting ongoing backup.");
        if (this.mBackupFuture != null && !this.mBackupFuture.isDone()) {
            LOG.info("Attempt to cancel backup task.");
            this.mBackupFuture.cancel(true);
        }
        boolean z = true;
        if (this.mBackupTimeoutTask != null) {
            LOG.info("Attempt to cancel backup timeout task.");
            z = this.mBackupTimeoutTask.cancel(true);
        }
        if (z) {
            try {
                LOG.info("Attempt to resume journal application.");
                this.mJournalSystem.resume();
            } catch (Exception e) {
                LOG.warn("Failed to resume journal application: {}", e.toString());
            }
        }
        LOG.warn("Backup interrupted successfully.");
    }

    private CompletableFuture<Void> handleRequestMessage(BackupRequestMessage backupRequestMessage) {
        LOG.info("Received backup message: {}", backupRequestMessage);
        Preconditions.checkState(!this.mBackupTracker.inProgress(), "Backup in progress");
        CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
        this.mBackupTracker.reset();
        this.mBackupTracker.update(new BackupStatus(backupRequestMessage.getBackupId(), BackupState.Initiating));
        this.mBackupTracker.updateHostname(NetworkAddressUtils.getLocalHostName((int) ServerConfiguration.global().getMs(PropertyKey.NETWORK_HOST_RESOLUTION_TIMEOUT_MS)));
        startHeartbeatThread();
        if (this.mBackupTimeoutTask.cancel(true)) {
            this.mBackupFuture = this.mExecutorService.submit(() -> {
                this.mBackupTracker.updateState(BackupState.Transitioning);
                try {
                    try {
                        LOG.info("Initiating catching up of journals to consistent sequences before starting backup. {}", backupRequestMessage.getJournalSequences());
                        CatchupFuture catchup = this.mJournalSystem.catchup(backupRequestMessage.getJournalSequences());
                        CompletableFuture.runAsync(() -> {
                            catchup.waitTermination();
                        }).get(BACKUP_ABORT_AFTER_TRANSITION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
                        LOG.info("Journal transition completed. Taking a backup.");
                        this.mBackupTracker.updateState(BackupState.Running);
                        this.mBackupTracker.updateBackupUri(takeBackup(backupRequestMessage.getBackupRequest(), this.mBackupTracker.getEntryCounter()));
                        this.mBackupTracker.updateState(BackupState.Completed);
                        try {
                            this.mBackupProgressFuture.get();
                        } catch (Exception e) {
                            LOG.warn("Failed to wait for backup heartbeat completion. ", e);
                        }
                        enforceResumeJournals();
                    } catch (InterruptedException e2) {
                        LOG.error("Backup interrupted at worker", e2);
                        this.mBackupTracker.updateError(new BackupException("Backup interrupted at worker", e2));
                        enforceResumeJournals();
                    } catch (Exception e3) {
                        LOG.error("Backup failed at worker", e3);
                        this.mBackupTracker.updateError(new BackupException(String.format("Backup failed at worker: %s", e3.getMessage()), e3));
                        enforceResumeJournals();
                    }
                } catch (Throwable th) {
                    enforceResumeJournals();
                    throw th;
                }
            });
            return completedFuture;
        }
        LOG.warn("Journal has been resumed due to a time-out");
        this.mBackupTracker.updateError(new BackupException("Journal has been resumed due to a time-out"));
        return completedFuture;
    }

    private void enforceResumeJournals() {
        try {
            this.mJournalSystem.resume();
        } catch (Throwable th) {
            ProcessUtils.fatalError(LOG, th, "Failed to resume journals.", new Object[0]);
        }
    }

    private void startHeartbeatThread() {
        if (this.mBackupProgressFuture != null && !this.mBackupProgressFuture.isDone()) {
            this.mBackupProgressFuture.cancel(true);
        }
        this.mBackupProgressFuture = this.mExecutorService.submit(() -> {
            boolean waitUntilFinished;
            do {
                waitUntilFinished = this.mBackupTracker.waitUntilFinished(this.mBackupHeartbeatIntervalMs, TimeUnit.MILLISECONDS);
                try {
                    sendMessageBlocking(this.mLeaderConnection, new BackupHeartbeatMessage(this.mBackupTracker.getCurrentStatus()));
                } catch (Exception e) {
                    LOG.warn("Failed to send heartbeat to backup-leader: {}. Error: {}", this.mLeaderConnection, e);
                }
            } while (!waitUntilFinished);
        });
    }

    private void activateLeaderConnection(GrpcMessagingConnection grpcMessagingConnection) throws IOException {
        grpcMessagingConnection.onException(th -> {
            LOG.warn("Backup-leader connection failed.", th);
        });
        this.mLeaderConnectionCloseListener = grpcMessagingConnection.onClose(grpcMessagingConnection2 -> {
            LOG.info("Backup-leader connection closed. {}", grpcMessagingConnection2);
            if (this.mBackupFuture != null && !this.mBackupFuture.isDone()) {
                LOG.warn("Cancelling ongoing backup as backup-leader is lost.");
                this.mBackupFuture.cancel(true);
                this.mBackupTracker.reset();
            }
            this.mExecutorService.submit(() -> {
                establishConnectionToLeader();
            });
        });
        try {
            this.mGrpcMessagingContext.execute(() -> {
                grpcMessagingConnection.handler(BackupSuspendMessage.class, this::handleSuspendJournalsMessage);
                grpcMessagingConnection.handler(BackupRequestMessage.class, this::handleRequestMessage);
                grpcMessagingConnection.sendAndReceive(new BackupHandshakeMessage(NetworkAddressUtils.getLocalHostName((int) ServerConfiguration.global().getMs(PropertyKey.NETWORK_HOST_RESOLUTION_TIMEOUT_MS))));
            }).get();
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while activating backup-leader connection.");
        } catch (ExecutionException e2) {
            grpcMessagingConnection.close();
            throw new IOException("Failed to activate backup-leader connection.", e2.getCause());
        }
    }

    private void establishConnectionToLeader() {
        ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry((int) this.mLeaderConnectionIntervalMin, (int) this.mLeaderConnectionIntervalMax, Integer.MAX_VALUE);
        while (exponentialBackoffRetry.attempt()) {
            try {
                InetSocketAddress primaryRpcAddress = MasterClientContext.newBuilder(ClientContext.create(ServerConfiguration.global())).build().getMasterInquireClient().getPrimaryRpcAddress();
                try {
                    GrpcMessagingClient grpcMessagingClient = new GrpcMessagingClient(ServerConfiguration.global(), this.mServerUserState, this.mExecutorService, "BackupWorker");
                    this.mLeaderConnection = (GrpcMessagingConnection) ((CompletableFuture) this.mGrpcMessagingContext.execute(() -> {
                        return grpcMessagingClient.connect(primaryRpcAddress);
                    }).get()).get();
                    activateLeaderConnection(this.mLeaderConnection);
                    LOG.info("Established connection to backup-leader: {}", primaryRpcAddress);
                    return;
                } catch (Throwable th) {
                    LOG.warn("Failed to establish connection to backup-leader: {}. Error:{}. Attempt:{}", new Object[]{primaryRpcAddress, th.toString(), Integer.valueOf(exponentialBackoffRetry.getAttemptCount())});
                }
            } catch (Throwable th2) {
                LOG.warn("Failed to get backup-leader address. Error:{}. Attempt:{}", th2.toString(), Integer.valueOf(exponentialBackoffRetry.getAttemptCount()));
            }
        }
    }
}
