package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.KafkaUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.RecordContext;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.ProducerManager;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.Function;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/state/WorkContainer.class */
public class WorkContainer<K, V> implements Comparable<WorkContainer<K, V>> {
    private static final Logger log = LoggerFactory.getLogger(WorkContainer.class);
    static final String DEFAULT_TYPE = "DEFAULT";

    @NonNull
    private final PCModule<K, V> module;
    private final long epoch;
    private String workType;
    private final ConsumerRecord<K, V> cr;
    private int numberOfFailedAttempts;
    private Optional<Instant> lastFailedAt;
    private Optional<Instant> succeededAt;
    private Optional<Throwable> lastFailureReason;
    private boolean inFlight;
    private Optional<Boolean> maybeUserFunctionSucceeded;
    private Future<List<?>> future;
    private Optional<Long> timeTakenAsWorkMs;
    private Optional<Instant> retryDueAt;
    private Comparator<WorkContainer<?, ?>> comparator;

    public WorkContainer(long j, ConsumerRecord<K, V> consumerRecord, @NonNull PCModule<K, V> pCModule, @NonNull String str) {
        this.numberOfFailedAttempts = 0;
        this.lastFailedAt = Optional.empty();
        this.succeededAt = Optional.empty();
        this.inFlight = false;
        this.maybeUserFunctionSucceeded = Optional.empty();
        this.timeTakenAsWorkMs = Optional.empty();
        this.retryDueAt = Optional.empty();
        this.comparator = Comparator.comparing(workContainer -> {
            TopicPartition topicPartition = workContainer.getTopicPartition();
            return topicPartition.topic() + topicPartition.partition();
        }).thenComparing((v0) -> {
            return v0.offset();
        });
        if (pCModule == null) {
            throw new NullPointerException("module is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("workType is marked non-null but is null");
        }
        this.epoch = j;
        this.cr = consumerRecord;
        this.workType = str;
        this.module = pCModule;
    }

    public WorkContainer(long j, ConsumerRecord<K, V> consumerRecord, PCModule<K, V> pCModule) {
        this(j, consumerRecord, pCModule, DEFAULT_TYPE);
    }

    public void endFlight() {
        log.trace("Ending flight {}", this);
        this.inFlight = false;
    }

    public boolean isDelayPassed() {
        if (!hasPreviouslyFailed()) {
            return true;
        }
        Duration delayUntilRetryDue = getDelayUntilRetryDue();
        return delayUntilRetryDue.isNegative() || delayUntilRetryDue.isZero();
    }

    public Duration getDelayUntilRetryDue() {
        return Duration.between(this.module.clock().instant(), getRetryDueAt());
    }

    public Instant getRetryDueAt() {
        return this.retryDueAt.orElse(Instant.MIN);
    }

    public Duration getRetryDelayConfig() {
        ParallelConsumerOptions<K, V> options = this.module.options();
        Function<RecordContext<K, V>, Duration> retryDelayProvider = options.getRetryDelayProvider();
        return retryDelayProvider != null ? retryDelayProvider.apply(new RecordContext<>(this)) : options.getDefaultMessageRetryDelay();
    }

    @Override // java.lang.Comparable
    public int compareTo(WorkContainer workContainer) {
        return this.comparator.compare(this, workContainer);
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        WorkContainer workContainer = (WorkContainer) obj;
        return getTopicPartition().topic().equals(workContainer.getTopicPartition().topic()) && getTopicPartition().partition() == workContainer.getTopicPartition().partition() && getCr().offset() == workContainer.getCr().offset();
    }

    public int hashCode() {
        return Objects.hash(getTopicPartition().topic(), Integer.valueOf(getTopicPartition().partition()), Long.valueOf(this.cr.offset()));
    }

    public boolean isNotInFlight() {
        return !isInFlight();
    }

    public boolean isInFlight() {
        return this.inFlight;
    }

    public void onQueueingForExecution() {
        log.trace("Queueing for execution: {}", this);
        this.inFlight = true;
        this.timeTakenAsWorkMs = Optional.of(Long.valueOf(System.currentTimeMillis()));
    }

    public TopicPartition getTopicPartition() {
        return KafkaUtils.toTopicPartition(getCr());
    }

    public void onUserFunctionSuccess() {
        this.succeededAt = Optional.of(this.module.clock().instant());
        this.maybeUserFunctionSucceeded = Optional.of(true);
    }

    public void onUserFunctionFailure(Throwable th) {
        log.trace("Failing {}", this);
        updateFailureHistory(th);
        this.maybeUserFunctionSucceeded = Optional.of(false);
    }

    private void updateFailureHistory(Throwable th) {
        this.numberOfFailedAttempts++;
        this.lastFailedAt = Optional.of(Instant.now(this.module.clock()));
        this.lastFailureReason = Optional.ofNullable(th);
        this.retryDueAt = Optional.of(this.lastFailedAt.get().plus((TemporalAmount) getRetryDelayConfig()));
    }

    public boolean isUserFunctionComplete() {
        return getMaybeUserFunctionSucceeded().isPresent();
    }

    public boolean isUserFunctionSucceeded() {
        return getMaybeUserFunctionSucceeded().orElse(false).booleanValue();
    }

    public String toString() {
        return "WorkContainer(tp:" + KafkaUtils.toTopicPartition(this.cr) + ":o:" + this.cr.offset() + ":k:" + this.cr.key() + ")";
    }

    public Duration getTimeInFlight() {
        return !this.timeTakenAsWorkMs.isPresent() ? Duration.ZERO : Duration.ofMillis(System.currentTimeMillis() - this.timeTakenAsWorkMs.get().longValue());
    }

    public long offset() {
        return getCr().offset();
    }

    public boolean hasPreviouslyFailed() {
        return getNumberOfFailedAttempts() > 0;
    }

    public boolean isAvailableToTakeAsWork() {
        return isNotInFlight() && !isUserFunctionSucceeded() && isDelayPassed();
    }

    public void onPostAddToMailBox(PollContextInternal<K, V> pollContextInternal, Optional<ProducerManager<K, V>> optional) {
        optional.ifPresent(producerManager -> {
            Optional<ProducerManager<K, V>.ProducingLock> producingLock = pollContextInternal.getProducingLock();
            Objects.requireNonNull(producerManager);
            producingLock.ifPresent(producerManager::finishProducing);
        });
    }

    public long getEpoch() {
        return this.epoch;
    }

    public String getWorkType() {
        return this.workType;
    }

    public void setWorkType(String str) {
        this.workType = str;
    }

    public ConsumerRecord<K, V> getCr() {
        return this.cr;
    }

    public int getNumberOfFailedAttempts() {
        return this.numberOfFailedAttempts;
    }

    public Optional<Instant> getLastFailedAt() {
        return this.lastFailedAt;
    }

    public Optional<Instant> getSucceededAt() {
        return this.succeededAt;
    }

    public Optional<Throwable> getLastFailureReason() {
        return this.lastFailureReason;
    }

    public Optional<Boolean> getMaybeUserFunctionSucceeded() {
        return this.maybeUserFunctionSucceeded;
    }

    public Future<List<?>> getFuture() {
        return this.future;
    }

    public void setFuture(Future<List<?>> future) {
        this.future = future;
    }
}
