package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:lib/amazon-kinesis-client-1.15.1.jar:com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.class */
public class KinesisShutdownTask implements ITask {
    private static final Log LOG = LogFactory.getLog(KinesisShutdownTask.class);

    @VisibleForTesting
    static final int RETRY_RANDOM_MAX_RANGE = 50;
    private final ShardInfo shardInfo;
    private final IRecordProcessor recordProcessor;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final ShutdownReason reason;
    private final IKinesisProxy kinesisProxy;
    private final KinesisClientLibLeaseCoordinator leaseCoordinator;
    private final InitialPositionInStreamExtended initialPositionInStream;
    private final boolean cleanupLeasesOfCompletedShards;
    private final boolean ignoreUnexpectedChildShards;
    private final TaskType taskType = TaskType.SHUTDOWN;
    private final long backoffTimeMillis;
    private final GetRecordsCache getRecordsCache;
    private final ShardSyncer shardSyncer;
    private final ShardSyncStrategy shardSyncStrategy;
    private final List<ChildShard> childShards;
    private final LeaseCleanupManager leaseCleanupManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisShutdownTask(ShardInfo shardInfo, IRecordProcessor iRecordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownReason shutdownReason, IKinesisProxy iKinesisProxy, InitialPositionInStreamExtended initialPositionInStreamExtended, boolean z, boolean z2, KinesisClientLibLeaseCoordinator kinesisClientLibLeaseCoordinator, long j, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, List<ChildShard> list, LeaseCleanupManager leaseCleanupManager) {
        this.shardInfo = shardInfo;
        this.recordProcessor = iRecordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.reason = shutdownReason;
        this.kinesisProxy = iKinesisProxy;
        this.initialPositionInStream = initialPositionInStreamExtended;
        this.cleanupLeasesOfCompletedShards = z;
        this.ignoreUnexpectedChildShards = z2;
        this.leaseCoordinator = kinesisClientLibLeaseCoordinator;
        this.backoffTimeMillis = j;
        this.getRecordsCache = getRecordsCache;
        this.shardSyncer = shardSyncer;
        this.shardSyncStrategy = shardSyncStrategy;
        this.childShards = list;
        this.leaseCleanupManager = leaseCleanupManager;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask, java.util.concurrent.Callable
    public TaskResult call() {
        LOG.info("Invoking shutdown() for shard " + this.shardInfo.getShardId() + ", concurrencyToken: " + this.shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + this.reason + ". childShards:" + this.childShards);
        try {
            KinesisClientLease currentlyHeldLease = this.leaseCoordinator.getCurrentlyHeldLease(this.shardInfo.getShardId());
            Runnable runnable = () -> {
                takeLeaseLostAction();
            };
            if (this.reason == ShutdownReason.TERMINATE) {
                try {
                    takeShardEndAction(currentlyHeldLease);
                } catch (InvalidStateException e) {
                    LOG.warn("Lease " + this.shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e);
                    dropLease(currentlyHeldLease);
                    throwOnApplicationException(runnable);
                }
            } else {
                throwOnApplicationException(runnable);
            }
            LOG.debug("Shutting down retrieval strategy.");
            this.getRecordsCache.shutdown();
            LOG.debug("Record processor completed shutdown() for shard " + this.shardInfo.getShardId());
            return new TaskResult((Exception) null);
        } catch (Exception e2) {
            if (e2 instanceof CustomerApplicationException) {
                LOG.error("Shard " + this.shardInfo.getShardId() + ": Application exception: ", e2);
            } else {
                LOG.error("Shard " + this.shardInfo.getShardId() + ": Caught exception: ", e2);
            }
            try {
                Thread.sleep(this.backoffTimeMillis);
            } catch (InterruptedException e3) {
                LOG.debug("Interrupted sleep", e3);
            }
            return new TaskResult(e2);
        }
    }

    private void takeShardEndAction(KinesisClientLease kinesisClientLease) throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException {
        if (kinesisClientLease == null) {
            throw new InvalidStateException("Shard " + this.shardInfo.getShardId() + ": Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
        }
        if (CollectionUtils.isNullOrEmpty(this.childShards)) {
            LOG.warn("Shard " + this.shardInfo.getShardId() + ": Shutting down consumer with SHARD_END reason without creating leases for child shards.");
        } else {
            createLeasesForChildShardsIfNotExist();
            updateCurrentLeaseWithChildShards(kinesisClientLease);
        }
        LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(kinesisClientLease, this.shardInfo);
        if (this.leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
            return;
        }
        boolean z = false;
        try {
            z = attemptShardEndCheckpointing();
            if (z || CollectionUtils.isNullOrEmpty(this.childShards)) {
                this.leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
            }
        } catch (Throwable th) {
            if (z || CollectionUtils.isNullOrEmpty(this.childShards)) {
                this.leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
            }
            throw th;
        }
    }

    private void takeLeaseLostAction() {
        this.recordProcessor.shutdown(new ShutdownInput().withShutdownReason(ShutdownReason.ZOMBIE).withCheckpointer(this.recordProcessorCheckpointer));
    }

    private boolean attemptShardEndCheckpointing() throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException {
        if (((KinesisClientLease) Optional.ofNullable(this.leaseCoordinator.getLeaseManager().getLease(this.shardInfo.getShardId())).orElseThrow(() -> {
            return new InvalidStateException("Lease for shard " + this.shardInfo.getShardId() + " does not exist.");
        })).getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
            return true;
        }
        throwOnApplicationException(() -> {
            applicationCheckpointAndVerification();
        });
        return true;
    }

    private void applicationCheckpointAndVerification() {
        this.recordProcessorCheckpointer.setSequenceNumberAtShardEnd(this.recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
        this.recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
        this.recordProcessor.shutdown(new ShutdownInput().withShutdownReason(ShutdownReason.TERMINATE).withCheckpointer(this.recordProcessorCheckpointer));
        boolean z = false;
        KinesisClientLease kinesisClientLease = null;
        try {
            kinesisClientLease = this.leaseCoordinator.getLeaseManager().getLease(this.shardInfo.getShardId());
        } catch (Exception e) {
            LOG.error("Shard " + this.shardInfo.getShardId() + " : Unable to get lease entry for shard to verify shard end checkpointing.", e);
        }
        if (kinesisClientLease == null || kinesisClientLease.getCheckpoint() == null) {
            LOG.error("Shard " + this.shardInfo.getShardId() + " : No lease checkpoint entry for shard to verify shard end checkpointing. Lease Entry : " + kinesisClientLease);
        } else {
            z = kinesisClientLease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END);
            ExtendedSequenceNumber lastCheckpointValue = this.recordProcessorCheckpointer.getLastCheckpointValue();
            if (!kinesisClientLease.getCheckpoint().equals(lastCheckpointValue)) {
                LOG.error("Shard " + this.shardInfo.getShardId() + " : Checkpoint information mismatch between authoritative source and local cache. This does not affect the application flow, but cut a ticket to Kinesis when you see this. Authoritative entry : " + kinesisClientLease.getCheckpoint() + " Cache entry : " + lastCheckpointValue);
            }
        }
        if (!z) {
            throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + this.shardInfo.getShardId() + ". Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information.");
        }
    }

    private void throwOnApplicationException(Runnable runnable) throws CustomerApplicationException {
        try {
            runnable.run();
        } catch (Exception e) {
            throw new CustomerApplicationException("Customer application throws exception for shard " + this.shardInfo.getShardId(), e);
        }
    }

    private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException {
        if (!CollectionUtils.isNullOrEmpty(this.childShards) && this.childShards.size() == 1) {
            ChildShard childShard = this.childShards.get(0);
            List<String> parentShards = childShard.getParentShards();
            if (parentShards.size() != 2) {
                throw new InvalidStateException("Shard " + this.shardInfo.getShardId() + "'s only child shard " + childShard + " does not contain other parent information.");
            }
            if (!(Objects.isNull(this.leaseCoordinator.getLeaseManager().getLease(parentShards.get(0))) == Objects.isNull(this.leaseCoordinator.getLeaseManager().getLease(parentShards.get(1))))) {
                if (!isOneInNProbability(50)) {
                    throw new BlockedOnParentShardException("Shard " + this.shardInfo.getShardId() + "'s only child shard " + childShard + " has partial parent information in lease table. Hence deferring lease creation of child shard.");
                }
                throw new InvalidStateException("Shard " + this.shardInfo.getShardId() + "'s only child shard " + childShard + " has partial parent information in lease table.");
            }
        }
        for (ChildShard childShard2 : this.childShards) {
            if (this.leaseCoordinator.getLeaseManager().getLease(childShard2.getShardId()) == null) {
                KinesisClientLease newKCLLeaseForChildShard = KinesisShardSyncer.newKCLLeaseForChildShard(childShard2);
                this.leaseCoordinator.getLeaseManager().createLeaseIfNotExists(newKCLLeaseForChildShard);
                LOG.info("Shard " + this.shardInfo.getShardId() + " : Created child shard lease: " + newKCLLeaseForChildShard.getLeaseKey());
            }
        }
    }

    @VisibleForTesting
    boolean isOneInNProbability(int i) {
        return 1 == new Random().nextInt((i - 1) + 1) + 1;
    }

    private void updateCurrentLeaseWithChildShards(KinesisClientLease kinesisClientLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        kinesisClientLease.setChildShardIds((Set) this.childShards.stream().map((v0) -> {
            return v0.getShardId();
        }).collect(Collectors.toSet()));
        this.leaseCoordinator.getLeaseManager().updateLeaseWithMetaInfo(kinesisClientLease, UpdateField.CHILD_SHARDS);
        LOG.info("Shard " + this.shardInfo.getShardId() + ": Updated current lease with child shard information: " + kinesisClientLease.getLeaseKey());
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask
    public TaskType getTaskType() {
        return this.taskType;
    }

    @VisibleForTesting
    ShutdownReason getReason() {
        return this.reason;
    }

    private void dropLease(KinesisClientLease kinesisClientLease) {
        if (kinesisClientLease == null) {
            LOG.warn("Shard " + this.shardInfo.getShardId() + ": Unable to find the lease for shard. Will shutdown the shardConsumer directly.");
        } else {
            this.leaseCoordinator.dropLease(kinesisClientLease);
            LOG.warn("Dropped lease for shutting down ShardConsumer: " + kinesisClientLease.getLeaseKey());
        }
    }
}
