/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.time;

import cz.o2.proxima.direct.time.NotProgressingWatermarkIdlePolicy;
import cz.o2.proxima.direct.time.TimestampSupplier;
import cz.o2.proxima.direct.time.WatermarkConfiguration;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.time.AbstractWatermarkEstimator;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.time.WatermarkEstimatorFactory;
import cz.o2.proxima.time.WatermarkIdlePolicy;
import cz.o2.proxima.time.WatermarkIdlePolicyFactory;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnboundedOutOfOrdernessWatermarkEstimator
extends AbstractWatermarkEstimator {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(UnboundedOutOfOrdernessWatermarkEstimator.class);
    private static final long serialVersionUID = 1L;
    static final long DEFAULT_MIN_WATERMARK = -9223372005318775808L;
    public static final String MIN_WATERMARK = "min-watermark";
    public static final String ESTIMATE_DURATION_MS = "estimate-duration";
    public static final String STEP_MS = "step";
    public static final String ALLOWED_TIMESTAMP_SKEW = "allowed-timestamp-skew";
    public static final long DEFAULT_ESTIMATE_DURATION_MS = 10000L;
    public static final long DEFAULT_STEP_MS = 200L;
    public static final long DEFAULT_ALLOWED_TIMESTAMP_SKEW = 200L;
    private final long stepMs;
    private final long estimateDurationMs;
    private final long allowedTimestampSkew;
    private final TimestampSupplier timestampSupplier;
    private final long[] stepDiffs;
    private final AtomicLong lastRotate;
    private final AtomicInteger rotatesToInitialize;
    private final AtomicLong watermark;
    private final AtomicLong lastStatLogged = new AtomicLong();

    public static Builder newBuilder() {
        return new Builder();
    }

    @VisibleForTesting
    UnboundedOutOfOrdernessWatermarkEstimator(long estimateDurationMs, long stepMs, long allowedTimestampSkew, long minWatermark, TimestampSupplier supplier, WatermarkIdlePolicy idlePolicy) {
        super(idlePolicy);
        this.estimateDurationMs = estimateDurationMs;
        this.stepMs = stepMs;
        this.allowedTimestampSkew = allowedTimestampSkew;
        this.timestampSupplier = Objects.requireNonNull(supplier);
        this.watermark = new AtomicLong(minWatermark);
        Preconditions.checkArgument((estimateDurationMs > 0L ? 1 : 0) != 0, (Object)"durationMs must be positive");
        Preconditions.checkArgument((stepMs > 0L ? 1 : 0) != 0, (Object)"stepMs must be positive");
        Preconditions.checkArgument((estimateDurationMs / stepMs * stepMs == estimateDurationMs ? 1 : 0) != 0, (Object)"durationMs must be divisible by stepMs");
        this.stepDiffs = new long[(int)(estimateDurationMs / stepMs) + 1];
        Arrays.fill(this.stepDiffs, 0L);
        this.rotatesToInitialize = new AtomicInteger(this.stepDiffs.length - 1);
        this.lastRotate = new AtomicLong(supplier.get() - stepMs);
    }

    protected long estimateWatermark() {
        boolean isProcessingBacklog;
        this.rotateIfNeeded();
        if (this.rotatesToInitialize.get() <= 0 && !(isProcessingBacklog = Arrays.stream(this.stepDiffs).anyMatch(diff -> diff > this.allowedTimestampSkew))) {
            this.watermark.accumulateAndGet(this.timestampSupplier.get() - this.allowedTimestampSkew, Math::max);
        }
        return this.watermark.get();
    }

    public void updateWatermark(StreamElement element) {
        this.add(element.getStamp());
    }

    public void setMinWatermark(long minWatermark) {
        this.watermark.accumulateAndGet(minWatermark, Math::max);
    }

    @VisibleForTesting
    void add(long stamp) {
        this.rotateIfNeeded();
        long diff = this.timestampSupplier.get() - stamp;
        if (this.stepDiffs[0] < diff) {
            this.stepDiffs[0] = diff;
        }
    }

    private void rotateIfNeeded() {
        long now = this.timestampSupplier.get();
        if (now > this.lastRotate.get() + this.stepMs) {
            this.rotate(now, (int)((now - this.lastRotate.get()) / this.stepMs));
        }
        if (log.isDebugEnabled() && now - this.lastStatLogged.get() > 10000L) {
            log.debug("Watermark delay stats: {} with allowedTimestampSkew {}", (Object)Arrays.toString(this.stepDiffs), (Object)this.allowedTimestampSkew);
            this.lastStatLogged.set(now);
        }
    }

    private void rotate(long now, int moveCount) {
        moveCount = Math.min(this.stepDiffs.length - 1, moveCount);
        System.arraycopy(this.stepDiffs, 0, this.stepDiffs, moveCount, this.stepDiffs.length - moveCount);
        if (this.rotatesToInitialize.get() > 0) {
            this.rotatesToInitialize.addAndGet(-moveCount);
        }
        for (int i = 0; i < moveCount; ++i) {
            this.stepDiffs[i] = 0L;
        }
        this.lastRotate.set(now);
    }

    @Generated
    public long getStepMs() {
        return this.stepMs;
    }

    @Generated
    public long getEstimateDurationMs() {
        return this.estimateDurationMs;
    }

    @Generated
    public long getAllowedTimestampSkew() {
        return this.allowedTimestampSkew;
    }

    public static class Builder {
        private final long durationMs;
        private final long stepMs;
        private final long allowedTimestampSkew;
        private final long minWatermark;
        private final TimestampSupplier timestampSupplier;
        private final WatermarkIdlePolicy watermarkIdlePolicy;

        Builder() {
            this(10000L, 200L, 200L, -9223372005318775808L, System::currentTimeMillis, new NotProgressingWatermarkIdlePolicy());
        }

        private Builder(long durationMs, long stepMs, long allowedTimestampSkew, long minWatermark, TimestampSupplier timestampSupplier, WatermarkIdlePolicy idlePolicy) {
            this.durationMs = durationMs;
            this.stepMs = stepMs;
            this.allowedTimestampSkew = allowedTimestampSkew;
            this.minWatermark = minWatermark;
            this.timestampSupplier = timestampSupplier;
            this.watermarkIdlePolicy = idlePolicy;
        }

        public Builder withDurationMs(long durationMs) {
            return new Builder(durationMs, this.stepMs, this.allowedTimestampSkew, this.minWatermark, this.timestampSupplier, this.watermarkIdlePolicy);
        }

        public Builder withStepMs(long stepMs) {
            return new Builder(this.durationMs, stepMs, this.allowedTimestampSkew, this.minWatermark, this.timestampSupplier, this.watermarkIdlePolicy);
        }

        public Builder withAllowedTimestampSkew(long allowedTimestampSkew) {
            return new Builder(this.durationMs, this.stepMs, allowedTimestampSkew, this.minWatermark, this.timestampSupplier, this.watermarkIdlePolicy);
        }

        public Builder withMinWatermark(long minWatermark) {
            return new Builder(this.durationMs, this.stepMs, this.allowedTimestampSkew, minWatermark, this.timestampSupplier, this.watermarkIdlePolicy);
        }

        public Builder withTimestampSupplier(TimestampSupplier timestampSupplier) {
            return new Builder(this.durationMs, this.stepMs, this.allowedTimestampSkew, this.minWatermark, timestampSupplier, this.watermarkIdlePolicy);
        }

        public Builder withWatermarkIdlePolicy(WatermarkIdlePolicy watermarkIdlePolicy) {
            return new Builder(this.durationMs, this.stepMs, this.allowedTimestampSkew, this.minWatermark, this.timestampSupplier, watermarkIdlePolicy);
        }

        public UnboundedOutOfOrdernessWatermarkEstimator build() {
            return new UnboundedOutOfOrdernessWatermarkEstimator(this.durationMs, this.stepMs, this.allowedTimestampSkew, this.minWatermark, this.timestampSupplier, this.watermarkIdlePolicy);
        }
    }

    public static class Factory
    implements WatermarkEstimatorFactory {
        public WatermarkEstimator create(Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {
            long durationMs = this.getConfiguration(UnboundedOutOfOrdernessWatermarkEstimator.ESTIMATE_DURATION_MS, cfg, 10000L);
            long stepMs = this.getConfiguration(UnboundedOutOfOrdernessWatermarkEstimator.STEP_MS, cfg, 200L);
            long allowedTimestampSkew = this.getConfiguration(UnboundedOutOfOrdernessWatermarkEstimator.ALLOWED_TIMESTAMP_SKEW, cfg, 200L);
            long minWatermark = this.getConfiguration(UnboundedOutOfOrdernessWatermarkEstimator.MIN_WATERMARK, cfg, -9223372005318775808L);
            return UnboundedOutOfOrdernessWatermarkEstimator.newBuilder().withAllowedTimestampSkew(allowedTimestampSkew).withDurationMs(durationMs).withStepMs(stepMs).withMinWatermark(minWatermark).withWatermarkIdlePolicy(idlePolicyFactory.create(cfg)).build();
        }

        private long getConfiguration(String configuration, Map<String, Object> cfg, long defaultValue) {
            return Optional.ofNullable(cfg.get(WatermarkConfiguration.prefixedKey(configuration))).map(v -> Long.valueOf(v.toString())).orElse(defaultValue);
        }
    }
}

