/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Stopwatch;
import org.apache.pulsar.functions.runtime.shaded.io.netty.channel.Channel;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.HashedWheelTimer;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.Timeout;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.Bookie;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.Watcher;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookieRequestProcessor;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.ReadEntryProcessorV3;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.RequestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LongPollReadEntryProcessorV3
extends ReadEntryProcessorV3
implements Watcher<LastAddConfirmedUpdateNotification> {
    private static final Logger logger = LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class);
    private final Long previousLAC;
    private Optional<Long> lastAddConfirmedUpdateTime = Optional.empty();
    private final ExecutorService longPollThreadPool;
    private final HashedWheelTimer requestTimer;
    private Timeout expirationTimerTask = null;
    private Future<?> deferredTask = null;
    private boolean shouldReadEntry = false;

    LongPollReadEntryProcessorV3(BookkeeperProtocol.Request request, Channel channel, BookieRequestProcessor requestProcessor, ExecutorService fenceThreadPool, ExecutorService longPollThreadPool, HashedWheelTimer requestTimer) {
        super(request, channel, requestProcessor, fenceThreadPool);
        this.previousLAC = this.readRequest.getPreviousLAC();
        this.longPollThreadPool = longPollThreadPool;
        this.requestTimer = requestTimer;
    }

    @Override
    protected Long getPreviousLAC() {
        return this.previousLAC;
    }

    private synchronized boolean shouldReadEntry() {
        return this.shouldReadEntry;
    }

    @Override
    protected BookkeeperProtocol.ReadResponse readEntry(BookkeeperProtocol.ReadResponse.Builder readResponseBuilder, long entryId, Stopwatch startTimeSw) throws IOException {
        if (RequestUtils.shouldPiggybackEntry(this.readRequest)) {
            if (!this.readRequest.hasPreviousLAC() || -1L != entryId) {
                logger.error("Incorrect read request, entry piggyback requested incorrectly for ledgerId {} entryId {}", (Object)this.ledgerId, (Object)entryId);
                return this.buildResponse(readResponseBuilder, BookkeeperProtocol.StatusCode.EBADREQ, startTimeSw);
            }
            long knownLAC = this.requestProcessor.bookie.readLastAddConfirmed(this.ledgerId);
            readResponseBuilder.setMaxLAC(knownLAC);
            if (knownLAC > this.previousLAC) {
                entryId = this.previousLAC + 1L;
                readResponseBuilder.setMaxLAC(knownLAC);
                if (this.lastAddConfirmedUpdateTime.isPresent()) {
                    readResponseBuilder.setLacUpdateTimestamp(this.lastAddConfirmedUpdateTime.get());
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("ReadLAC Piggy Back reading entry:{} from ledger: {}", (Object)entryId, (Object)this.ledgerId);
                }
                try {
                    return super.readEntry(readResponseBuilder, entryId, true, startTimeSw);
                }
                catch (Bookie.NoEntryException e) {
                    this.requestProcessor.getRequestStats().getReadLastEntryNoEntryErrorCounter().inc();
                    logger.info("No entry found while piggyback reading entry {} from ledger {} : previous lac = {}", new Object[]{entryId, this.ledgerId, this.previousLAC});
                    return this.buildResponse(readResponseBuilder, BookkeeperProtocol.StatusCode.EOK, startTimeSw);
                }
            }
            if (knownLAC < this.previousLAC && logger.isDebugEnabled()) {
                logger.debug("Found smaller lac when piggy back reading lac and entry from ledger {} : previous lac = {}, known lac = {}", new Object[]{this.ledgerId, this.previousLAC, knownLAC});
            }
            return this.buildResponse(readResponseBuilder, BookkeeperProtocol.StatusCode.EOK, startTimeSw);
        }
        return super.readEntry(readResponseBuilder, entryId, false, startTimeSw);
    }

    private BookkeeperProtocol.ReadResponse buildErrorResponse(BookkeeperProtocol.StatusCode statusCode, Stopwatch sw) {
        BookkeeperProtocol.ReadResponse.Builder builder = BookkeeperProtocol.ReadResponse.newBuilder().setLedgerId(this.ledgerId).setEntryId(this.entryId);
        return this.buildResponse(builder, statusCode, sw);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private BookkeeperProtocol.ReadResponse getLongPollReadResponse() {
        if (!this.shouldReadEntry() && this.readRequest.hasTimeOut()) {
            boolean watched;
            if (logger.isTraceEnabled()) {
                logger.trace("Waiting For LAC Update {}", (Object)this.previousLAC);
            }
            Stopwatch startTimeSw = Stopwatch.createStarted();
            try {
                watched = this.requestProcessor.getBookie().waitForLastAddConfirmedUpdate(this.ledgerId, this.previousLAC, this);
            }
            catch (Bookie.NoLedgerException e) {
                logger.info("No ledger found while longpoll reading ledger {}, previous lac = {}.", (Object)this.ledgerId, (Object)this.previousLAC);
                return this.buildErrorResponse(BookkeeperProtocol.StatusCode.ENOLEDGER, startTimeSw);
            }
            catch (IOException ioe) {
                logger.error("IOException while longpoll reading ledger {}, previous lac = {} : ", new Object[]{this.ledgerId, this.previousLAC, ioe});
                return this.buildErrorResponse(BookkeeperProtocol.StatusCode.EIO, startTimeSw);
            }
            this.registerSuccessfulEvent(this.requestProcessor.getRequestStats().getLongPollPreWaitStats(), startTimeSw);
            this.lastPhaseStartTime.reset().start();
            if (watched) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Waiting For LAC Update {}: Timeout {}", (Object)this.previousLAC, (Object)this.readRequest.getTimeOut());
                }
                LongPollReadEntryProcessorV3 longPollReadEntryProcessorV3 = this;
                synchronized (longPollReadEntryProcessorV3) {
                    this.expirationTimerTask = this.requestTimer.newTimeout(timeout -> {
                        this.requestProcessor.getBookie().cancelWaitForLastAddConfirmedUpdate(this.ledgerId, this);
                        this.scheduleDeferredRead(true);
                    }, this.readRequest.getTimeOut(), TimeUnit.MILLISECONDS);
                }
                return null;
            }
        }
        return this.getReadResponse();
    }

    @Override
    protected void executeOp() {
        BookkeeperProtocol.ReadResponse readResponse = this.getLongPollReadResponse();
        if (null != readResponse) {
            this.sendResponse(readResponse);
        }
    }

    @Override
    public void update(LastAddConfirmedUpdateNotification newLACNotification) {
        if (newLACNotification.getLastAddConfirmed() > this.previousLAC) {
            if (newLACNotification.getLastAddConfirmed() != Long.MAX_VALUE && !this.lastAddConfirmedUpdateTime.isPresent()) {
                this.lastAddConfirmedUpdateTime = Optional.of(newLACNotification.getTimestamp());
            }
            if (logger.isTraceEnabled()) {
                logger.trace("Last Add Confirmed Advanced to {} for request {}", (Object)newLACNotification.getLastAddConfirmed(), (Object)this.request);
            }
            this.scheduleDeferredRead(false);
        }
        newLACNotification.recycle();
    }

    private synchronized void scheduleDeferredRead(boolean timeout) {
        if (null == this.deferredTask) {
            if (logger.isTraceEnabled()) {
                logger.trace("Deferred Task, expired: {}, request: {}", (Object)timeout, (Object)this.request);
            }
            try {
                this.shouldReadEntry = true;
                this.deferredTask = this.longPollThreadPool.submit(this);
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
            if (null != this.expirationTimerTask) {
                this.expirationTimerTask.cancel();
            }
            this.registerEvent(timeout, this.requestProcessor.getRequestStats().getLongPollWaitStats(), this.lastPhaseStartTime);
            this.lastPhaseStartTime.reset().start();
        }
    }
}

