/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.kafka.source.checkpoint.trigger;

import io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTrigger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;

public class CountingCheckpointTrigger
implements CheckpointTrigger {
    private final int threshold;
    private final AtomicInteger counter;
    private final AtomicBoolean checkpoint = new AtomicBoolean(false);
    private final AtomicBoolean isActive;
    private final Subscription checkpointOffsetsTimer;

    public CountingCheckpointTrigger(int threshold, int triggerIntervalMs) {
        this.threshold = threshold;
        this.counter = new AtomicInteger(0);
        this.isActive = new AtomicBoolean(true);
        this.checkpointOffsetsTimer = Observable.interval((long)triggerIntervalMs, (TimeUnit)TimeUnit.MILLISECONDS).subscribe((Action1)new Action1<Long>(){

            public void call(Long aLong) {
                CountingCheckpointTrigger.this.checkpoint.set(true);
            }
        });
    }

    @Override
    public boolean shouldCheckpoint() {
        return this.counter.get() > this.threshold || this.checkpoint.get();
    }

    @Override
    public void update(int count) {
        this.counter.addAndGet(count);
    }

    @Override
    public void reset() {
        this.counter.set(0);
        this.checkpoint.set(false);
    }

    @Override
    public boolean isActive() {
        return this.isActive.get();
    }

    @Override
    public void shutdown() {
        if (this.isActive()) {
            this.checkpointOffsetsTimer.unsubscribe();
            this.reset();
            this.isActive.compareAndSet(true, false);
        }
    }
}

