/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentMessageFinder
implements AsyncCallbacks.FindEntryCallback {
    private final ManagedCursor cursor;
    private final String subName;
    private final String topicName;
    private long timestamp = 0L;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private volatile int messageFindInProgress = 0;
    private static final AtomicIntegerFieldUpdater<PersistentMessageFinder> messageFindInProgressUpdater = AtomicIntegerFieldUpdater.newUpdater(PersistentMessageFinder.class, "messageFindInProgress");
    private static final Logger log = LoggerFactory.getLogger(PersistentMessageFinder.class);

    public PersistentMessageFinder(String topicName, ManagedCursor cursor) {
        this.topicName = topicName;
        this.cursor = cursor;
        this.subName = Codec.decode((String)cursor.getName());
    }

    public void findMessages(long timestamp, AsyncCallbacks.FindEntryCallback callback) {
        this.timestamp = timestamp;
        if (messageFindInProgressUpdater.compareAndSet(this, 0, 1)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Starting message position find at timestamp {}", (Object)this.subName, (Object)timestamp);
            }
            this.cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
                try {
                    long entryTimestamp = Commands.getEntryTimestamp((ByteBuf)entry.getDataBuffer());
                    boolean bl = MessageImpl.isEntryPublishedEarlierThan((long)entryTimestamp, (long)timestamp);
                    return bl;
                }
                catch (Exception e) {
                    log.error("[{}][{}] Error deserializing message for message position find", new Object[]{this.topicName, this.subName, e});
                }
                finally {
                    entry.release();
                }
                return false;
            }, (AsyncCallbacks.FindEntryCallback)this, (Object)callback);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running", (Object)this.topicName, (Object)this.subName);
            }
            callback.findEntryFailed((ManagedLedgerException)new ManagedLedgerException.ConcurrentFindCursorPositionException("last find is still running"), Optional.empty(), null);
        }
    }

    public void findEntryComplete(Position position, Object ctx) {
        Preconditions.checkArgument((boolean)(ctx instanceof AsyncCallbacks.FindEntryCallback));
        AsyncCallbacks.FindEntryCallback callback = (AsyncCallbacks.FindEntryCallback)ctx;
        if (position != null) {
            log.info("[{}][{}] Found position {} closest to provided timestamp {}", new Object[]{this.topicName, this.subName, position, this.timestamp});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{}] No position found closest to provided timestamp {}", new Object[]{this.topicName, this.subName, this.timestamp});
        }
        this.messageFindInProgress = 0;
        callback.findEntryComplete(position, null);
    }

    public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
        Preconditions.checkArgument((boolean)(ctx instanceof AsyncCallbacks.FindEntryCallback));
        AsyncCallbacks.FindEntryCallback callback = (AsyncCallbacks.FindEntryCallback)ctx;
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] message position find operation failed for provided timestamp {}", new Object[]{this.topicName, this.subName, this.timestamp, exception});
        }
        this.messageFindInProgress = 0;
        callback.findEntryFailed(exception, failedReadPosition, null);
    }
}

