package com.github.castorm.kafka.connect.throttle;

import com.github.castorm.kafka.connect.http.model.Offset;
import com.github.castorm.kafka.connect.throttle.spi.Throttler;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;

/* loaded from: input_file:com/github/castorm/kafka/connect/throttle/AdaptableIntervalThrottler.class */
public class AdaptableIntervalThrottler implements Throttler {
    private static final Offset EMPTY_OFFSET = Offset.of(Collections.emptyMap());
    private final Function<Map<String, ?>, AdaptableIntervalThrottlerConfig> configFactory;
    private Throttler tailThrottler;
    private Throttler catchupThrottler;
    private Long intervalMillis;
    private Offset lastOffset;

    public AdaptableIntervalThrottler() {
        this(AdaptableIntervalThrottlerConfig::new);
    }

    public AdaptableIntervalThrottler(Function<Map<String, ?>, AdaptableIntervalThrottlerConfig> function) {
        this.lastOffset = EMPTY_OFFSET;
        this.configFactory = function;
    }

    public void configure(Map<String, ?> map) {
        AdaptableIntervalThrottlerConfig apply = this.configFactory.apply(map);
        this.tailThrottler = apply.getTailThrottler();
        this.catchupThrottler = apply.getCatchupThrottler();
        this.intervalMillis = apply.getTailThrottler().getIntervalMillis();
    }

    @Override // com.github.castorm.kafka.connect.throttle.spi.Throttler
    public void throttle(Offset offset) throws InterruptedException {
        resolveThrottler(offset).throttle(offset);
        this.lastOffset = offset;
    }

    private Throttler resolveThrottler(Offset offset) {
        return (isFirst() || isCatchingUp(offset)) ? this.catchupThrottler : this.tailThrottler;
    }

    private boolean isFirst() {
        return EMPTY_OFFSET.equals(this.lastOffset);
    }

    private boolean isCatchingUp(Offset offset) {
        return (!this.lastOffset.getTimestamp().equals(offset.getTimestamp())) && offset.getTimestamp().orElse(Instant.EPOCH).isBefore(Instant.now().minus(this.intervalMillis.longValue(), (TemporalUnit) ChronoUnit.MILLIS));
    }
}
