/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.windowing.triggers;

import java.util.List;
import org.apache.pulsar.functions.windowing.DefaultEvictionContext;
import org.apache.pulsar.functions.windowing.Event;
import org.apache.pulsar.functions.windowing.EvictionPolicy;
import org.apache.pulsar.functions.windowing.TriggerHandler;
import org.apache.pulsar.functions.windowing.TriggerPolicy;
import org.apache.pulsar.functions.windowing.WindowManager;

public class WatermarkCountTriggerPolicy<T>
implements TriggerPolicy<T, Long> {
    private final int count;
    private final TriggerHandler handler;
    private final EvictionPolicy<T, ?> evictionPolicy;
    private final WindowManager<T> windowManager;
    private volatile long lastProcessedTs;
    private boolean started;

    public WatermarkCountTriggerPolicy(int count, TriggerHandler handler, EvictionPolicy<T, ?> evictionPolicy, WindowManager<T> windowManager) {
        this.count = count;
        this.handler = handler;
        this.evictionPolicy = evictionPolicy;
        this.windowManager = windowManager;
        this.started = false;
    }

    @Override
    public void track(Event<T> event) {
        if (this.started && event.isWatermark()) {
            this.handleWaterMarkEvent(event);
        }
    }

    @Override
    public void reset() {
    }

    @Override
    public void start() {
        this.started = true;
    }

    @Override
    public void shutdown() {
    }

    private void handleWaterMarkEvent(Event<T> waterMarkEvent) {
        long watermarkTs = waterMarkEvent.getTimestamp();
        List<Long> eventTs = this.windowManager.getSlidingCountTimestamps(this.lastProcessedTs, watermarkTs, this.count);
        for (long ts : eventTs) {
            this.evictionPolicy.setContext(new DefaultEvictionContext(ts, null, Long.valueOf(this.count)));
            this.handler.onTrigger();
            this.lastProcessedTs = ts;
        }
    }

    @Override
    public Long getState() {
        return this.lastProcessedTs;
    }

    @Override
    public void restoreState(Long state) {
        this.lastProcessedTs = state;
    }

    public String toString() {
        return "WatermarkCountTriggerPolicy{count=" + this.count + ", lastProcessedTs=" + this.lastProcessedTs + ", started=" + this.started + '}';
    }
}

