/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.delayed;

import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Clock;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryDelayedDeliveryTracker
implements DelayedDeliveryTracker,
TimerTask {
    private static final Logger log = LoggerFactory.getLogger(InMemoryDelayedDeliveryTracker.class);
    private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
    private final PersistentDispatcherMultipleConsumers dispatcher;
    private final Timer timer;
    private Timeout timeout;
    private long currentTimeoutTarget;
    private long lastTickRun;
    private long tickTimeMillis;
    private final Clock clock;
    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
    public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50000L;
    private long highestDeliveryTimeTracked = 0L;
    private boolean messagesHaveFixedDelay = true;

    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict) {
        this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
    }

    InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict) {
        this.dispatcher = dispatcher;
        this.timer = timer;
        this.tickTimeMillis = tickTimeMillis;
        this.clock = clock;
        this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
    }

    private long getCutoffTime() {
        return this.isDelayedDeliveryDeliverAtTimeStrict ? this.clock.millis() : this.clock.millis() + this.tickTimeMillis;
    }

    @Override
    public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
        if (deliverAt < 0L || deliverAt <= this.getCutoffTime()) {
            this.messagesHaveFixedDelay = false;
            return false;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", new Object[]{this.dispatcher.getName(), ledgerId, entryId, deliverAt - this.clock.millis()});
        }
        this.priorityQueue.add(deliverAt, ledgerId, entryId);
        this.updateTimer();
        if (deliverAt < this.highestDeliveryTimeTracked - this.tickTimeMillis) {
            this.messagesHaveFixedDelay = false;
        }
        this.highestDeliveryTimeTracked = Math.max(this.highestDeliveryTimeTracked, deliverAt);
        return true;
    }

    @Override
    public boolean hasMessageAvailable() {
        boolean hasMessageAvailable;
        boolean bl = hasMessageAvailable = !this.priorityQueue.isEmpty() && this.priorityQueue.peekN1() <= this.getCutoffTime();
        if (!hasMessageAvailable) {
            this.updateTimer();
        }
        return hasMessageAvailable;
    }

    @Override
    public Set<PositionImpl> getScheduledMessages(int maxMessages) {
        long timestamp;
        TreeSet<PositionImpl> positions = new TreeSet<PositionImpl>();
        long cutoffTime = this.getCutoffTime();
        for (int n = maxMessages; n > 0 && !this.priorityQueue.isEmpty() && (timestamp = this.priorityQueue.peekN1()) <= cutoffTime; --n) {
            long ledgerId = this.priorityQueue.peekN2();
            long entryId = this.priorityQueue.peekN3();
            positions.add(new PositionImpl(ledgerId, entryId));
            this.priorityQueue.pop();
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Get scheduled messages - found {}", (Object)this.dispatcher.getName(), (Object)positions.size());
        }
        if (this.priorityQueue.isEmpty()) {
            this.highestDeliveryTimeTracked = 0L;
            this.messagesHaveFixedDelay = true;
        }
        this.updateTimer();
        return positions;
    }

    @Override
    public void resetTickTime(long tickTime) {
        if (this.tickTimeMillis != tickTime) {
            this.tickTimeMillis = tickTime;
        }
    }

    @Override
    public void clear() {
        this.priorityQueue.clear();
    }

    @Override
    public long getNumberOfDelayedMessages() {
        return this.priorityQueue.size();
    }

    private void updateTimer() {
        long now;
        long delayMillis;
        if (this.priorityQueue.isEmpty()) {
            if (this.timeout != null) {
                this.currentTimeoutTarget = -1L;
                this.timeout.cancel();
                this.timeout = null;
            }
            return;
        }
        long timestamp = this.priorityQueue.peekN1();
        if (timestamp == this.currentTimeoutTarget) {
            return;
        }
        if (this.timeout != null) {
            this.timeout.cancel();
        }
        if ((delayMillis = timestamp - (now = this.clock.millis())) < 0L) {
            return;
        }
        long remainingTickDelayMillis = this.lastTickRun + this.tickTimeMillis - now;
        long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Start timer in {} millis", (Object)this.dispatcher.getName(), (Object)calculatedDelayMillis);
        }
        this.currentTimeoutTarget = timestamp;
        this.timeout = this.timer.newTimeout((TimerTask)this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(Timeout timeout) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Timer triggered", (Object)this.dispatcher.getName());
        }
        if (timeout.isCancelled()) {
            return;
        }
        PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = this.dispatcher;
        synchronized (persistentDispatcherMultipleConsumers) {
            this.lastTickRun = this.clock.millis();
            this.currentTimeoutTarget = -1L;
            timeout = null;
            this.dispatcher.readMoreEntries();
        }
    }

    @Override
    public void close() {
        this.priorityQueue.close();
        if (this.timeout != null) {
            this.timeout.cancel();
        }
    }

    @Override
    public boolean shouldPauseAllDeliveries() {
        return this.messagesHaveFixedDelay && this.priorityQueue.size() >= 50000L && !this.hasMessageAvailable();
    }
}

