/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client;

import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Preconditions;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.ImmutableMap;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.Recycler;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.AsyncCallback;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.ClientContext;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.ByteBufList;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PendingAddOp
extends org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.SafeRunnable
implements BookkeeperInternalCallbacks.WriteCallback {
    private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class);
    ByteBuf payload;
    ByteBufList toSend;
    AsyncCallback.AddCallbackWithLatency cb;
    Object ctx;
    long entryId;
    int entryLength;
    DistributionSchedule.AckSet ackSet;
    boolean completed = false;
    LedgerHandle lh;
    ClientContext clientCtx;
    boolean isRecoveryAdd = false;
    long requestTimeNanos;
    long qwcLatency;
    Set<BookieId> addEntrySuccessBookies;
    long writeDelayedStartTime;
    long currentLedgerLength;
    int pendingWriteRequests;
    boolean callbackTriggered;
    boolean hasRun;
    EnumSet<WriteFlag> writeFlags;
    boolean allowFailFast = false;
    List<BookieId> ensemble;
    private final Recycler.Handle<PendingAddOp> recyclerHandle;
    private static final Recycler<PendingAddOp> RECYCLER = new Recycler<PendingAddOp>(){

        @Override
        protected PendingAddOp newObject(Recycler.Handle<PendingAddOp> handle) {
            return new PendingAddOp(handle);
        }
    };

    static PendingAddOp create(LedgerHandle lh, ClientContext clientCtx, List<BookieId> ensemble, ByteBuf payload, EnumSet<WriteFlag> writeFlags, AsyncCallback.AddCallbackWithLatency cb, Object ctx) {
        PendingAddOp op = RECYCLER.get();
        op.lh = lh;
        op.clientCtx = clientCtx;
        op.isRecoveryAdd = false;
        op.cb = cb;
        op.ctx = ctx;
        op.entryId = -1L;
        op.currentLedgerLength = -1L;
        op.payload = payload;
        op.entryLength = payload.readableBytes();
        op.completed = false;
        op.ensemble = ensemble;
        op.ackSet = lh.getDistributionSchedule().getAckSet();
        op.pendingWriteRequests = 0;
        op.callbackTriggered = false;
        op.hasRun = false;
        op.requestTimeNanos = Long.MAX_VALUE;
        op.allowFailFast = false;
        op.qwcLatency = 0L;
        op.writeFlags = writeFlags;
        if (op.addEntrySuccessBookies == null) {
            op.addEntrySuccessBookies = new HashSet<BookieId>();
        } else {
            op.addEntrySuccessBookies.clear();
        }
        op.writeDelayedStartTime = -1L;
        return op;
    }

    PendingAddOp enableRecoveryAdd() {
        this.isRecoveryAdd = true;
        return this;
    }

    PendingAddOp allowFailFastOnUnwritableChannel() {
        this.allowFailFast = true;
        return this;
    }

    void setEntryId(long entryId) {
        this.entryId = entryId;
    }

    void setLedgerLength(long ledgerLength) {
        this.currentLedgerLength = ledgerLength;
    }

    long getEntryId() {
        return this.entryId;
    }

    void sendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
        int flags = this.isRecoveryAdd ? 6 : 0;
        this.clientCtx.getBookieClient().addEntry(ensemble.get(bookieIndex), this.lh.ledgerId, this.lh.ledgerKey, this.entryId, this.toSend, this, bookieIndex, flags, this.allowFailFast, this.lh.writeFlags);
        ++this.pendingWriteRequests;
    }

    boolean maybeTimeout() {
        if (MathUtils.elapsedNanos(this.requestTimeNanos) >= this.clientCtx.getConf().addEntryQuorumTimeoutNanos) {
            this.timeoutQuorumWait();
            return true;
        }
        return false;
    }

    void timeoutQuorumWait() {
        try {
            this.clientCtx.getMainWorkerPool().executeOrdered(this.lh.ledgerId, (SafeRunnable)new org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.SafeRunnable(){

                @Override
                public void safeRun() {
                    if (PendingAddOp.this.completed) {
                        return;
                    }
                    if (PendingAddOp.this.addEntrySuccessBookies.size() >= PendingAddOp.this.lh.getLedgerMetadata().getAckQuorumSize()) {
                        PendingAddOp.this.clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc();
                    }
                    PendingAddOp.this.lh.handleUnrecoverableErrorDuringAdd(-21);
                }

                public String toString() {
                    return String.format("AddEntryQuorumTimeout(lid=%d, eid=%d)", PendingAddOp.this.lh.ledgerId, PendingAddOp.this.entryId);
                }
            });
        }
        catch (RejectedExecutionException e) {
            LOG.warn("Timeout add entry quorum wait failed {} entry: {}", (Object)this.lh.ledgerId, (Object)this.entryId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
        this.ensemble = ensemble;
        if (this.toSend == null) {
            return;
        }
        DistributionSchedule.WriteSet writeSet = this.lh.distributionSchedule.getWriteSet(this.entryId);
        try {
            if (!writeSet.contains(bookieIndex)) {
                this.lh.sendAddSuccessCallbacks();
                return;
            }
        }
        finally {
            writeSet.recycle();
        }
        if (this.callbackTriggered) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Unsetting success for ledger: " + this.lh.ledgerId + " entry: " + this.entryId + " bookie index: " + bookieIndex);
        }
        if (!this.ackSet.removeBookieAndCheck(bookieIndex)) {
            this.completed = false;
        }
        this.sendWriteRequest(ensemble, bookieIndex);
    }

    @Override
    public void safeRun() {
        this.hasRun = true;
        if (this.callbackTriggered) {
            this.maybeRecycle();
            return;
        }
        this.requestTimeNanos = MathUtils.nowInNano();
        Preconditions.checkNotNull(this.lh);
        Preconditions.checkNotNull(this.lh.macManager);
        this.toSend = this.lh.macManager.computeDigestAndPackageForSending(this.entryId, this.lh.lastAddConfirmed, this.currentLedgerLength, this.payload);
        this.payload = null;
        this.lh.maybeHandleDelayedWriteBookieFailure();
        DistributionSchedule.WriteSet writeSet = this.lh.distributionSchedule.getWriteSet(this.entryId);
        try {
            for (int i = 0; i < writeSet.size(); ++i) {
                this.sendWriteRequest(this.ensemble, writeSet.get(i));
            }
        }
        finally {
            writeSet.recycle();
        }
    }

    @Override
    public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
        int bookieIndex = (Integer)ctx;
        --this.pendingWriteRequests;
        if (!this.ensemble.get(bookieIndex).equals(addr)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Write did not succeed: " + ledgerId + ", " + entryId + ". But we have already fixed it.");
            }
            return;
        }
        boolean ackQuorum = false;
        if (0 == rc) {
            ackQuorum = this.ackSet.completeBookieAndCheck(bookieIndex);
            this.addEntrySuccessBookies.add(this.ensemble.get(bookieIndex));
        }
        if (this.completed) {
            if (rc != 0) {
                this.clientCtx.getClientStats().getAddOpUrCounter().inc();
                if (!this.clientCtx.getConf().disableEnsembleChangeFeature.isAvailable() && !this.clientCtx.getConf().delayEnsembleChange) {
                    this.lh.notifyWriteFailed(bookieIndex, addr);
                }
            }
            this.sendAddSuccessCallbacks();
            this.maybeRecycle();
            return;
        }
        switch (rc) {
            case 0: {
                break;
            }
            case -19: {
                this.lh.errorOutPendingAdds(rc);
                return;
            }
            case -100: {
                this.lh.handleUnrecoverableErrorDuringAdd(rc);
                return;
            }
            case -101: {
                LOG.warn("Fencing exception on write: L{} E{} on {}", new Object[]{ledgerId, entryId, addr});
                this.lh.handleUnrecoverableErrorDuringAdd(rc);
                return;
            }
            case -102: {
                LOG.warn("Unauthorized access exception on write: L{} E{} on {}", new Object[]{ledgerId, entryId, addr});
                this.lh.handleUnrecoverableErrorDuringAdd(rc);
                return;
            }
            default: {
                if (this.clientCtx.getConf().delayEnsembleChange) {
                    if (this.ackSet.failBookieAndCheck(bookieIndex, addr) || rc == -104) {
                        Map<Integer, BookieId> failedBookies = this.ackSet.getFailedBookies();
                        LOG.warn("Failed to write entry ({}, {}) to bookies {}, handling failures.", new Object[]{ledgerId, entryId, failedBookies});
                        this.lh.handleBookieFailure(failedBookies);
                    } else if (LOG.isDebugEnabled()) {
                        LOG.debug("Failed to write entry ({}, {}) to bookie ({}, {}), but it didn't break ack quorum, delaying ensemble change : {}", new Object[]{ledgerId, entryId, bookieIndex, addr, BKException.getMessage(rc)});
                    }
                } else {
                    LOG.warn("Failed to write entry ({}, {}): {}", new Object[]{ledgerId, entryId, BKException.getMessage(rc)});
                    this.lh.handleBookieFailure(ImmutableMap.of(bookieIndex, addr));
                }
                return;
            }
        }
        if (ackQuorum && !this.completed) {
            if (this.clientCtx.getConf().enforceMinNumFaultDomainsForWrite && !this.clientCtx.getPlacementPolicy().areAckedBookiesAdheringToPlacementPolicy(this.addEntrySuccessBookies, this.lh.getLedgerMetadata().getWriteQuorumSize(), this.lh.getLedgerMetadata().getAckQuorumSize())) {
                LOG.warn("Write success for entry ID {} delayed, not acknowledged by bookies in enough fault domains", (Object)entryId);
                this.clientCtx.getClientStats().getWriteDelayedDueToNotEnoughFaultDomains().inc();
                if (this.writeDelayedStartTime == -1L) {
                    this.writeDelayedStartTime = MathUtils.nowInNano();
                }
            } else {
                this.completed = true;
                this.qwcLatency = MathUtils.elapsedNanos(this.requestTimeNanos);
                if (this.writeDelayedStartTime != -1L) {
                    this.clientCtx.getClientStats().getWriteDelayedDueToNotEnoughFaultDomainsLatency().registerSuccessfulEvent(MathUtils.elapsedNanos(this.writeDelayedStartTime), TimeUnit.NANOSECONDS);
                }
                this.sendAddSuccessCallbacks();
            }
        }
    }

    void sendAddSuccessCallbacks() {
        this.lh.sendAddSuccessCallbacks();
    }

    void submitCallback(int rc) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", new Object[]{this.lh.getId(), this.entryId, rc});
        }
        long latencyNanos = MathUtils.elapsedNanos(this.requestTimeNanos);
        if (rc != 0) {
            this.clientCtx.getClientStats().getAddOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
            LOG.error("Write of ledger entry to quorum failed: L{} E{}", (Object)this.lh.getId(), (Object)this.entryId);
        } else {
            this.clientCtx.getClientStats().getAddOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
        }
        this.cb.addCompleteWithLatency(rc, this.lh, this.entryId, this.qwcLatency, this.ctx);
        this.callbackTriggered = true;
        this.maybeRecycle();
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("PendingAddOp(lid:").append(this.lh.ledgerId).append(", eid:").append(this.entryId).append(", completed:").append(this.completed).append(")");
        return sb.toString();
    }

    public int hashCode() {
        return (int)this.entryId;
    }

    public boolean equals(Object o) {
        if (o instanceof PendingAddOp) {
            return this.entryId == ((PendingAddOp)o).entryId;
        }
        return this == o;
    }

    private PendingAddOp(Recycler.Handle<PendingAddOp> recyclerHandle) {
        this.recyclerHandle = recyclerHandle;
    }

    private void maybeRecycle() {
        if (this.hasRun && this.callbackTriggered) {
            ReferenceCountUtil.release(this.toSend);
            this.toSend = null;
        }
        if (this.hasRun && this.toSend == null && this.pendingWriteRequests == 0) {
            this.recyclePendAddOpObject();
        }
    }

    private void recyclePendAddOpObject() {
        this.entryId = -1L;
        this.currentLedgerLength = -1L;
        if (this.payload != null) {
            ReferenceCountUtil.release(this.payload);
            this.payload = null;
        }
        this.cb = null;
        this.ctx = null;
        this.ensemble = null;
        this.ackSet.recycle();
        this.ackSet = null;
        this.lh = null;
        this.clientCtx = null;
        this.isRecoveryAdd = false;
        this.completed = false;
        this.pendingWriteRequests = 0;
        this.callbackTriggered = false;
        this.hasRun = false;
        this.allowFailFast = false;
        this.writeFlags = null;
        this.addEntrySuccessBookies.clear();
        this.writeDelayedStartTime = -1L;
        this.recyclerHandle.recycle(this);
    }
}

