/*
 * Decompiled with CFR 0.152.
 */
package dlshade.org.apache.bookkeeper.client;

import dlshade.com.google.common.util.concurrent.ListenableFuture;
import dlshade.org.apache.bookkeeper.client.BKException;
import dlshade.org.apache.bookkeeper.client.ClientContext;
import dlshade.org.apache.bookkeeper.client.DistributionSchedule;
import dlshade.org.apache.bookkeeper.client.LedgerEntry;
import dlshade.org.apache.bookkeeper.client.LedgerHandle;
import dlshade.org.apache.bookkeeper.client.SpeculativeRequestExecutor;
import dlshade.org.apache.bookkeeper.client.api.LedgerMetadata;
import dlshade.org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import dlshade.org.apache.bookkeeper.net.BookieId;
import dlshade.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import dlshade.org.apache.bookkeeper.proto.ReadLastConfirmedAndEntryContext;
import dlshade.org.apache.bookkeeper.util.MathUtils;
import io.netty.buffer.ByteBuf;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ReadLastConfirmedAndEntryOp
implements BookkeeperInternalCallbacks.ReadEntryCallback,
SpeculativeRequestExecutor {
    static final Logger LOG = LoggerFactory.getLogger(ReadLastConfirmedAndEntryOp.class);
    ReadLACAndEntryRequest request;
    final BitSet heardFromHostsBitSet;
    final BitSet emptyResponsesFromHostsBitSet;
    final int maxMissedReadsAllowed;
    boolean parallelRead = false;
    final AtomicBoolean requestComplete = new AtomicBoolean(false);
    final long requestTimeNano;
    private final LedgerHandle lh;
    private final ClientContext clientCtx;
    private final LastConfirmedAndEntryCallback cb;
    private int numResponsesPending;
    private final int numEmptyResponsesAllowed;
    private volatile boolean hasValidResponse = false;
    private final long prevEntryId;
    private long lastAddConfirmed;
    private long timeOutInMillis;
    private final List<BookieId> currentEnsemble;
    private ScheduledFuture<?> speculativeTask = null;

    ReadLastConfirmedAndEntryOp(LedgerHandle lh, ClientContext clientCtx, List<BookieId> ensemble, LastConfirmedAndEntryCallback cb, long prevEntryId, long timeOutInMillis) {
        this.lh = lh;
        this.clientCtx = clientCtx;
        this.cb = cb;
        this.prevEntryId = prevEntryId;
        this.lastAddConfirmed = lh.getLastAddConfirmed();
        this.timeOutInMillis = timeOutInMillis;
        this.numResponsesPending = 0;
        this.currentEnsemble = ensemble;
        this.numEmptyResponsesAllowed = this.getLedgerMetadata().getEnsembleSize() - this.getLedgerMetadata().getAckQuorumSize() + 1;
        this.requestTimeNano = MathUtils.nowInNano();
        this.maxMissedReadsAllowed = this.getLedgerMetadata().getEnsembleSize() - this.getLedgerMetadata().getAckQuorumSize();
        this.heardFromHostsBitSet = new BitSet(this.getLedgerMetadata().getEnsembleSize());
        this.emptyResponsesFromHostsBitSet = new BitSet(this.getLedgerMetadata().getEnsembleSize());
    }

    protected LedgerMetadata getLedgerMetadata() {
        return this.lh.getLedgerMetadata();
    }

    ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
        this.parallelRead = enabled;
        return this;
    }

    protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
        if (this.speculativeTask != null) {
            this.speculativeTask.cancel(mayInterruptIfRunning);
            this.speculativeTask = null;
        }
    }

    @Override
    public ListenableFuture<Boolean> issueSpeculativeRequest() {
        return this.clientCtx.getMainWorkerPool().submitOrdered(this.lh.getId(), new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                if (!ReadLastConfirmedAndEntryOp.this.requestComplete.get() && !ReadLastConfirmedAndEntryOp.this.request.isComplete() && null != ReadLastConfirmedAndEntryOp.this.request.maybeSendSpeculativeRead(ReadLastConfirmedAndEntryOp.this.heardFromHostsBitSet)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Send speculative ReadLAC {} for ledger {} (previousLAC: {}). Hosts heard are {}.", new Object[]{ReadLastConfirmedAndEntryOp.this.request, ReadLastConfirmedAndEntryOp.this.lh.getId(), ReadLastConfirmedAndEntryOp.this.lastAddConfirmed, ReadLastConfirmedAndEntryOp.this.heardFromHostsBitSet});
                    }
                    return true;
                }
                return false;
            }
        });
    }

    public void initiate() {
        this.request = this.parallelRead ? new ParallelReadRequest(this.currentEnsemble, this.lh.getId(), this.prevEntryId + 1L) : new SequenceReadRequest(this.currentEnsemble, this.lh.getId(), this.prevEntryId + 1L);
        this.request.read();
        if (!this.parallelRead && this.clientCtx.getConf().readLACSpeculativeRequestPolicy.isPresent()) {
            this.speculativeTask = this.clientCtx.getConf().readLACSpeculativeRequestPolicy.get().initiateSpeculativeRequest(this.clientCtx.getScheduler(), this);
        }
    }

    void sendReadTo(int bookieIndex, BookieId to, ReadLACAndEntryRequest entry) throws InterruptedException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Calling Read LAC and Entry with {} and long polling interval {} on Bookie {} - Parallel {}", new Object[]{this.prevEntryId, this.timeOutInMillis, to, this.parallelRead});
        }
        this.clientCtx.getBookieClient().readEntryWaitForLACUpdate(to, this.lh.getId(), -1L, this.prevEntryId, this.timeOutInMillis, true, this, new ReadLastConfirmedAndEntryContext(bookieIndex, to));
        ++this.numResponsesPending;
    }

    private void submitCallback(int rc) {
        LedgerEntry entry;
        long latencyMicros = MathUtils.elapsedMicroSec(this.requestTimeNano);
        this.cancelSpeculativeTask(true);
        if (0 != rc) {
            this.clientCtx.getClientStats().getReadLacAndEntryOpLogger().registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
            entry = null;
        } else {
            this.clientCtx.getClientStats().getReadLacAndEntryOpLogger().registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
            entry = this.request.entryImpl.getEntryBuffer() != null ? new LedgerEntry(this.request.entryImpl) : null;
        }
        this.request.close();
        this.cb.readLastConfirmedAndEntryComplete(rc, this.lastAddConfirmed, entry);
    }

    /*
     * Enabled aggressive block sorting
     */
    @Override
    public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) {
        block15: {
            BookieId bookie;
            ReadLastConfirmedAndEntryContext rCtx;
            block16: {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("{} received response for (lid={}, eid={}) : {}", new Object[]{this.getClass().getName(), ledgerId, entryId, rc});
                }
                rCtx = (ReadLastConfirmedAndEntryContext)ctx;
                bookie = rCtx.getBookieAddress();
                --this.numResponsesPending;
                if (0 != rc) break block16;
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Received lastAddConfirmed (lac={}) from bookie({}) for (lid={}).", new Object[]{rCtx.getLastAddConfirmed(), bookie, ledgerId});
                }
                if (rCtx.getLastAddConfirmed() > this.lastAddConfirmed) {
                    this.lastAddConfirmed = rCtx.getLastAddConfirmed();
                    this.lh.updateLastConfirmed(rCtx.getLastAddConfirmed(), 0L);
                }
                this.hasValidResponse = true;
                if (entryId != -1L) {
                    buffer.retain();
                    if (!this.requestComplete.get() && this.request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
                        if (rCtx.getLacUpdateTimestamp().isPresent()) {
                            long elapsedMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis() - rCtx.getLacUpdateTimestamp().get());
                            elapsedMicros = Math.max(elapsedMicros, 0L);
                            this.clientCtx.getClientStats().getReadLacAndEntryRespLogger().registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
                        }
                        if (!this.completeRequest()) {
                            buffer.release();
                        }
                        this.heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
                        break block15;
                    } else {
                        buffer.release();
                    }
                    break block15;
                } else {
                    this.emptyResponsesFromHostsBitSet.set(rCtx.getBookieIndex(), true);
                    if (this.lastAddConfirmed > this.prevEntryId) {
                        this.completeRequest();
                        return;
                    }
                    if (this.emptyResponsesFromHostsBitSet.cardinality() >= this.numEmptyResponsesAllowed) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Completed readLACAndEntry(lid = {}, previousEntryId = {}) after received {} empty responses ('{}').", new Object[]{ledgerId, this.prevEntryId, this.emptyResponsesFromHostsBitSet.cardinality(), this.emptyResponsesFromHostsBitSet});
                        }
                        this.completeRequest();
                        return;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received empty response for readLACAndEntry(lid = {}, previousEntryId = {}) from bookie {} @ {}, reattempting reading next bookie : lac = {}", new Object[]{ledgerId, this.prevEntryId, rCtx.getBookieAddress(), rCtx.getBookieAddress(), this.lastAddConfirmed});
                    }
                    this.request.logErrorAndReattemptRead(rCtx.getBookieIndex(), bookie, "Empty Response", rc);
                    return;
                }
            }
            if (-102 == rc && !this.requestComplete.get()) {
                this.submitCallback(rc);
                this.requestComplete.set(true);
            } else {
                this.request.logErrorAndReattemptRead(rCtx.getBookieIndex(), bookie, "Error: " + BKException.getMessage(rc), rc);
                return;
            }
        }
        if (this.numResponsesPending > 0) return;
        this.completeRequest();
    }

    private boolean completeRequest() {
        boolean requestCompleted = this.requestComplete.compareAndSet(false, true);
        if (requestCompleted) {
            if (!this.hasValidResponse) {
                this.submitCallback(this.request.getFirstError());
            } else {
                this.submitCallback(0);
            }
        }
        return requestCompleted;
    }

    public String toString() {
        return String.format("ReadLastConfirmedAndEntryOp(lid=%d, prevEntryId=%d])", this.lh.getId(), this.prevEntryId);
    }

    static interface LastConfirmedAndEntryCallback {
        public void readLastConfirmedAndEntryComplete(int var1, long var2, LedgerEntry var4);
    }

    class SequenceReadRequest
    extends ReadLACAndEntryRequest {
        static final int NOT_FOUND = -1;
        int nextReplicaIndexToReadFrom;
        final BitSet sentReplicas;
        final BitSet erroredReplicas;
        final BitSet emptyResponseReplicas;

        SequenceReadRequest(List<BookieId> ensemble, long lId, long eId) {
            super(ensemble, lId, eId);
            this.nextReplicaIndexToReadFrom = 0;
            this.sentReplicas = new BitSet(this.orderedEnsemble.size());
            this.erroredReplicas = new BitSet(this.orderedEnsemble.size());
            this.emptyResponseReplicas = new BitSet(this.orderedEnsemble.size());
        }

        private synchronized int getNextReplicaIndexToReadFrom() {
            return this.nextReplicaIndexToReadFrom;
        }

        private int getReplicaIndex(int bookieIndex) {
            return this.orderedEnsemble.indexOf(bookieIndex);
        }

        private BitSet getSentToBitSet() {
            BitSet b = new BitSet(this.ensemble.size());
            for (int i = 0; i < this.sentReplicas.length(); ++i) {
                if (!this.sentReplicas.get(i)) continue;
                b.set(this.orderedEnsemble.get(i));
            }
            return b;
        }

        private boolean readsOutstanding() {
            return this.sentReplicas.cardinality() - this.erroredReplicas.cardinality() - this.emptyResponseReplicas.cardinality() > 0;
        }

        @Override
        synchronized BookieId maybeSendSpeculativeRead(BitSet heardFrom) {
            if (this.nextReplicaIndexToReadFrom >= ReadLastConfirmedAndEntryOp.this.getLedgerMetadata().getEnsembleSize()) {
                return null;
            }
            BitSet sentTo = this.getSentToBitSet();
            sentTo.and(heardFrom);
            if (sentTo.cardinality() == 0) {
                return this.sendNextRead();
            }
            return null;
        }

        @Override
        void read() {
            this.sendNextRead();
        }

        synchronized BookieId sendNextRead() {
            if (this.nextReplicaIndexToReadFrom >= ReadLastConfirmedAndEntryOp.this.getLedgerMetadata().getEnsembleSize()) {
                if (-8 == this.firstError && this.numMissedEntryReads > ReadLastConfirmedAndEntryOp.this.maxMissedReadsAllowed) {
                    this.firstError = -13;
                }
                this.fail(this.firstError);
                return null;
            }
            int replica = this.nextReplicaIndexToReadFrom;
            int bookieIndex = this.orderedEnsemble.get(this.nextReplicaIndexToReadFrom);
            ++this.nextReplicaIndexToReadFrom;
            try {
                BookieId to = (BookieId)this.ensemble.get(bookieIndex);
                ReadLastConfirmedAndEntryOp.this.sendReadTo(bookieIndex, to, this);
                this.sentReplicas.set(replica);
                return to;
            }
            catch (InterruptedException ie) {
                LOG.error("Interrupted reading entry " + this, (Throwable)ie);
                Thread.currentThread().interrupt();
                this.fail(-15);
                return null;
            }
        }

        @Override
        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) {
            super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
            int replica = this.getReplicaIndex(bookieIndex);
            if (replica == -1) {
                LOG.error("Received error from a host which is not in the ensemble {} {}.", (Object)host, (Object)this.ensemble);
                return;
            }
            if (0 == rc) {
                this.emptyResponseReplicas.set(replica);
            } else {
                this.erroredReplicas.set(replica);
            }
            if (!this.readsOutstanding()) {
                this.sendNextRead();
            }
        }

        @Override
        boolean complete(int bookieIndex, BookieId host, ByteBuf buffer, long entryId) {
            boolean completed = super.complete(bookieIndex, host, buffer, entryId);
            if (completed) {
                int numReplicasTried = this.getNextReplicaIndexToReadFrom();
                for (int i = 0; i < numReplicasTried; ++i) {
                    int slowBookieIndex = this.orderedEnsemble.get(i);
                    BookieId slowBookieSocketAddress = (BookieId)this.ensemble.get(slowBookieIndex);
                    ReadLastConfirmedAndEntryOp.this.clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, entryId);
                }
            }
            return completed;
        }
    }

    class ParallelReadRequest
    extends ReadLACAndEntryRequest {
        int numPendings;

        ParallelReadRequest(List<BookieId> ensemble, long lId, long eId) {
            super(ensemble, lId, eId);
            this.numPendings = this.orderedEnsemble.size();
        }

        @Override
        void read() {
            for (int i = 0; i < this.orderedEnsemble.size(); ++i) {
                BookieId to = (BookieId)this.ensemble.get(this.orderedEnsemble.get(i));
                try {
                    ReadLastConfirmedAndEntryOp.this.sendReadTo(this.orderedEnsemble.get(i), to, this);
                    continue;
                }
                catch (InterruptedException ie) {
                    LOG.error("Interrupted reading entry {} : ", (Object)this, (Object)ie);
                    Thread.currentThread().interrupt();
                    this.fail(-15);
                    return;
                }
            }
        }

        @Override
        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) {
            super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
            --this.numPendings;
            if (this.numMissedEntryReads > ReadLastConfirmedAndEntryOp.this.maxMissedReadsAllowed || this.numPendings == 0) {
                if (-8 == this.firstError && this.numMissedEntryReads > ReadLastConfirmedAndEntryOp.this.maxMissedReadsAllowed) {
                    this.firstError = -13;
                }
                this.fail(this.firstError);
            }
        }

        @Override
        BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet) {
            return null;
        }
    }

    abstract class ReadLACAndEntryRequest
    implements AutoCloseable {
        final AtomicBoolean complete = new AtomicBoolean(false);
        int rc = 0;
        int firstError = 0;
        int numMissedEntryReads = 0;
        final List<BookieId> ensemble;
        final DistributionSchedule.WriteSet writeSet;
        final DistributionSchedule.WriteSet orderedEnsemble;
        final LedgerEntryImpl entryImpl;

        ReadLACAndEntryRequest(List<BookieId> ensemble, long lId, long eId) {
            this.entryImpl = LedgerEntryImpl.create(lId, eId);
            this.ensemble = ensemble;
            this.writeSet = ReadLastConfirmedAndEntryOp.this.lh.getDistributionSchedule().getEnsembleSet(eId);
            this.orderedEnsemble = ((ReadLastConfirmedAndEntryOp)ReadLastConfirmedAndEntryOp.this).clientCtx.getConf().enableReorderReadSequence ? ReadLastConfirmedAndEntryOp.this.clientCtx.getPlacementPolicy().reorderReadLACSequence(ensemble, ReadLastConfirmedAndEntryOp.this.lh.getBookiesHealthInfo(), this.writeSet.copy()) : this.writeSet.copy();
        }

        @Override
        public void close() {
            this.entryImpl.close();
        }

        synchronized int getFirstError() {
            return this.firstError;
        }

        abstract void read();

        boolean complete(int bookieIndex, BookieId host, ByteBuf buffer, long entryId) {
            ByteBuf content;
            try {
                content = ReadLastConfirmedAndEntryOp.this.lh.getDigestManager().verifyDigestAndReturnData(entryId, buffer);
            }
            catch (BKException.BKDigestMatchException e) {
                this.logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", -5);
                return false;
            }
            if (!this.complete.getAndSet(true)) {
                this.writeSet.recycle();
                this.orderedEnsemble.recycle();
                this.rc = 0;
                this.entryImpl.setLength(buffer.getLong(24));
                this.entryImpl.setEntryBuf(content);
                return true;
            }
            return false;
        }

        boolean fail(int rc) {
            if (this.complete.compareAndSet(false, true)) {
                this.writeSet.recycle();
                this.orderedEnsemble.recycle();
                this.rc = rc;
                this.translateAndSetFirstError(rc);
                ReadLastConfirmedAndEntryOp.this.completeRequest();
                return true;
            }
            return false;
        }

        private synchronized void translateAndSetFirstError(int rc) {
            if (0 == this.firstError || -13 == this.firstError || -7 == this.firstError) {
                this.firstError = rc;
            } else if (-8 == this.firstError && -13 != rc && -7 != rc) {
                this.firstError = rc;
            }
        }

        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) {
            this.translateAndSetFirstError(rc);
            if (-13 == rc || -7 == rc) {
                if (this.writeSet.contains(bookieIndex)) {
                    ReadLastConfirmedAndEntryOp.this.lh.registerOperationFailureOnBookie(host, this.entryImpl.getEntryId());
                }
                ++this.numMissedEntryReads;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} while reading entry: {} ledgerId: {} from bookie: {}", new Object[]{errMsg, this.entryImpl.getEntryId(), ReadLastConfirmedAndEntryOp.this.lh.getId(), host});
            }
        }

        abstract BookieId maybeSendSpeculativeRead(BitSet var1);

        boolean isComplete() {
            return this.complete.get();
        }

        int getRc() {
            return this.rc;
        }

        public String toString() {
            return String.format("L%d-E%d", this.entryImpl.getLedgerId(), this.entryImpl.getEntryId());
        }
    }
}

