/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.time;

import cz.o2.proxima.direct.time.TimestampSupplier;
import cz.o2.proxima.direct.time.WatermarkConfiguration;
import cz.o2.proxima.time.WatermarkIdlePolicy;
import cz.o2.proxima.time.WatermarkIdlePolicyFactory;
import java.util.Map;
import java.util.Optional;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SkewedProcessingTimeIdlePolicy
implements WatermarkIdlePolicy {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SkewedProcessingTimeIdlePolicy.class);
    private static final long serialVersionUID = 1L;
    public static final String TIMESTAMP_SKEW = "timestamp-skew";
    public static final long DEFAULT_TIMESTAMP_SKEW = 100L;
    private final long timestampSkew;
    private final TimestampSupplier timestampSupplier;
    private long currentWatermark = Long.MIN_VALUE;

    SkewedProcessingTimeIdlePolicy(long timestampSkew) {
        this(timestampSkew, System::currentTimeMillis);
    }

    SkewedProcessingTimeIdlePolicy(long timestampSkew, TimestampSupplier timestampSupplier) {
        this.timestampSkew = timestampSkew;
        this.timestampSupplier = timestampSupplier;
    }

    public long getIdleWatermark() {
        return this.currentWatermark;
    }

    public void idle(long currentWatermark) {
        this.currentWatermark = this.timestampSupplier.get() - this.timestampSkew;
    }

    @Generated
    public long getTimestampSkew() {
        return this.timestampSkew;
    }

    public static class Factory
    implements WatermarkIdlePolicyFactory {
        public WatermarkIdlePolicy create(Map<String, Object> cfg) {
            long timestampSkew;
            if (cfg.containsKey(SkewedProcessingTimeIdlePolicy.TIMESTAMP_SKEW)) {
                log.warn(String.format("Legacy configuration being used '%s' prefer to use configuration '%s'", SkewedProcessingTimeIdlePolicy.TIMESTAMP_SKEW, WatermarkConfiguration.prefixedKey(SkewedProcessingTimeIdlePolicy.TIMESTAMP_SKEW)));
                timestampSkew = Optional.ofNullable(cfg.get(SkewedProcessingTimeIdlePolicy.TIMESTAMP_SKEW)).map(v -> Long.valueOf(v.toString())).orElse(100L);
            } else {
                timestampSkew = Optional.ofNullable(cfg.get(WatermarkConfiguration.prefixedKey(SkewedProcessingTimeIdlePolicy.TIMESTAMP_SKEW))).map(v -> Long.valueOf(v.toString())).orElse(100L);
            }
            return new SkewedProcessingTimeIdlePolicy(timestampSkew);
        }
    }
}

