/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer.impl;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.netty.buffer.ByteBuf;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionBufferHandlerImpl
implements TransactionBufferHandler {
    private static final Logger log = LoggerFactory.getLogger(TransactionBufferHandlerImpl.class);
    private final ConcurrentSkipListMap<Long, OpRequestSend> outstandingRequests;
    private final GrowableArrayBlockingQueue<OpRequestSend> pendingRequests;
    private final AtomicLong requestIdGenerator = new AtomicLong();
    private final long operationTimeoutInMills;
    private final HashedWheelTimer timer;
    private final PulsarClient pulsarClient;
    private static final AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, "requestCredits");
    private volatile int requestCredits;
    private final LoadingCache<String, CompletableFuture<ClientCnx>> lookupCache = CacheBuilder.newBuilder().maximumSize(100000L).expireAfterAccess(30L, TimeUnit.MINUTES).build((CacheLoader)new CacheLoader<String, CompletableFuture<ClientCnx>>(){

        public CompletableFuture<ClientCnx> load(String topic) {
            CompletableFuture<ClientCnx> siFuture = TransactionBufferHandlerImpl.this.getClientCnx(topic);
            siFuture.whenComplete((si, cause) -> {
                if (null != cause) {
                    TransactionBufferHandlerImpl.this.lookupCache.invalidate((Object)topic);
                }
            });
            return siFuture;
        }
    });

    public TransactionBufferHandlerImpl(PulsarClient pulsarClient, HashedWheelTimer timer, int maxConcurrentRequests) {
        this.pulsarClient = pulsarClient;
        this.outstandingRequests = new ConcurrentSkipListMap();
        this.pendingRequests = new GrowableArrayBlockingQueue();
        this.operationTimeoutInMills = 3000L;
        this.timer = timer;
        this.requestCredits = Math.max(100, maxConcurrentRequests);
    }

    public CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits, TxnAction action, long lowWaterMark) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] endTxnOnTopic txnId: [{}], txnAction: [{}]", new Object[]{topic, new TxnID(txnIdMostBits, txnIdLeastBits), action.getValue()});
        }
        CompletableFuture<TxnID> cb = new CompletableFuture<TxnID>();
        long requestId = this.requestIdGenerator.getAndIncrement();
        ByteBuf cmd = Commands.newEndTxnOnPartition((long)requestId, (long)txnIdLeastBits, (long)txnIdMostBits, (String)topic, (TxnAction)action, (long)lowWaterMark);
        try {
            OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, (CompletableFuture)this.lookupCache.get((Object)topic));
            if (this.checkRequestCredits(op)) {
                this.endTxn(op);
            }
        }
        catch (ExecutionException e) {
            log.error("[{}] failed to get client cnx from lookup cache", (Object)topic, (Object)e);
            this.lookupCache.invalidate((Object)topic);
            cb.completeExceptionally((Throwable)new PulsarClientException.LookupException(e.getCause().getMessage()));
        }
        return cb;
    }

    public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription, long txnIdMostBits, long txnIdLeastBits, TxnAction action, long lowWaterMark) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] endTxnOnSubscription txnId: [{}], txnAction: [{}]", new Object[]{topic, new TxnID(txnIdMostBits, txnIdLeastBits), action.getValue()});
        }
        CompletableFuture<TxnID> cb = new CompletableFuture<TxnID>();
        long requestId = this.requestIdGenerator.getAndIncrement();
        ByteBuf cmd = Commands.newEndTxnOnSubscription((long)requestId, (long)txnIdLeastBits, (long)txnIdMostBits, (String)topic, (String)subscription, (TxnAction)action, (long)lowWaterMark);
        try {
            OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, (CompletableFuture)this.lookupCache.get((Object)topic));
            if (this.checkRequestCredits(op)) {
                this.endTxn(op);
            }
        }
        catch (ExecutionException e) {
            log.error("[{}] failed to get client cnx from lookup cache", (Object)topic, (Object)e);
            this.lookupCache.invalidate((Object)topic);
            cb.completeExceptionally((Throwable)new PulsarClientException.LookupException(e.getCause().getMessage()));
        }
        return cb;
    }

    private boolean checkRequestCredits(OpRequestSend op) {
        int currentPermits = REQUEST_CREDITS_UPDATER.get(this);
        if (currentPermits > 0 && this.pendingRequests.peek() == null) {
            if (REQUEST_CREDITS_UPDATER.compareAndSet(this, currentPermits, currentPermits - 1)) {
                return true;
            }
            return this.checkRequestCredits(op);
        }
        this.pendingRequests.add((Object)op);
        return false;
    }

    public void endTxn(OpRequestSend op) {
        op.cnx.whenComplete((clientCnx, throwable) -> {
            if (throwable == null) {
                if (clientCnx.ctx().channel().isActive()) {
                    clientCnx.registerTransactionBufferHandler((TransactionBufferHandler)this);
                    this.outstandingRequests.put(op.requestId, op);
                    this.timer.newTimeout(timeout -> {
                        OpRequestSend peek = this.outstandingRequests.remove(op.requestId);
                        if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) {
                            peek.cb.completeExceptionally((Throwable)new TransactionBufferClientException.RequestTimeoutException());
                            this.onResponse(peek);
                        }
                    }, this.operationTimeoutInMills, TimeUnit.MILLISECONDS);
                    op.cmd.retain();
                    clientCnx.ctx().writeAndFlush((Object)op.cmd, clientCnx.ctx().voidPromise());
                } else {
                    this.invalidateLookupCache(op);
                    op.cb.completeExceptionally((Throwable)new PulsarClientException.LookupException(op.topic + " endTxn channel is not active"));
                    this.onResponse(op);
                }
            } else {
                log.error("endTxn error topic: [{}]", (Object)op.topic, throwable);
                this.invalidateLookupCache(op);
                op.cb.completeExceptionally((Throwable)new PulsarClientException.LookupException(throwable.getMessage()));
                this.onResponse(op);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) {
        OpRequestSend op = this.outstandingRequests.remove(requestId);
        if (op == null) {
            if (log.isDebugEnabled()) {
                log.debug("Got end txn on topic response for timeout {} - {}", (Object)response.getTxnidMostBits(), (Object)response.getTxnidLeastBits());
            }
            return;
        }
        try {
            if (!response.hasError()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Got end txn on topic response for for request {}", (Object)op.topic, (Object)response.getRequestId());
                }
                op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
            } else {
                log.error("[{}] Got end txn on topic response for request {} error {}", new Object[]{op.topic, response.getRequestId(), response.getError()});
                this.invalidateLookupCache(op);
                op.cb.completeExceptionally(ClientCnx.getPulsarClientException((ServerError)response.getError(), (String)response.getMessage()));
            }
        }
        catch (Exception e) {
            log.error("[{}] Got exception when complete EndTxnOnTopic op for request {}", (Object)op.topic, (Object)e);
        }
        finally {
            this.onResponse(op);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleEndTxnOnSubscriptionResponse(long requestId, CommandEndTxnOnSubscriptionResponse response) {
        OpRequestSend op = this.outstandingRequests.remove(requestId);
        if (op == null) {
            if (log.isDebugEnabled()) {
                log.debug("Got end txn on subscription response for timeout {} - {}", (Object)response.getTxnidMostBits(), (Object)response.getTxnidLeastBits());
            }
            return;
        }
        try {
            if (!response.hasError()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Got end txn on subscription response for for request {}", (Object)op.topic, (Object)response.getRequestId());
                }
                op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
            } else {
                log.error("[{}] Got end txn on subscription response for request {} error {}", new Object[]{op.topic, response.getRequestId(), response.getError()});
                this.invalidateLookupCache(op);
                op.cb.completeExceptionally(ClientCnx.getPulsarClientException((ServerError)response.getError(), (String)response.getMessage()));
            }
        }
        catch (Exception e) {
            log.error("[{}] Got exception when complete EndTxnOnSub op for request {}", (Object)op.topic, (Object)e);
        }
        finally {
            this.onResponse(op);
        }
    }

    public void onResponse(OpRequestSend op) {
        REQUEST_CREDITS_UPDATER.incrementAndGet(this);
        if (op != null) {
            ReferenceCountUtil.safeRelease((Object)op.cmd);
            op.recycle();
        }
        this.checkPendingRequests();
    }

    private void checkPendingRequests() {
        int permits;
        while ((permits = REQUEST_CREDITS_UPDATER.get(this)) > 0 && this.pendingRequests.peek() != null) {
            if (!REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits - 1)) continue;
            OpRequestSend polled = (OpRequestSend)this.pendingRequests.poll();
            if (polled != null) {
                try {
                    if (polled.cnx != this.lookupCache.get((Object)polled.topic)) {
                        OpRequestSend invalid = polled;
                        polled = OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb, (CompletableFuture)this.lookupCache.get((Object)invalid.topic));
                        invalid.recycle();
                    }
                    this.endTxn(polled);
                }
                catch (ExecutionException e) {
                    log.error("[{}] failed to get client cnx from lookup cache", (Object)polled.topic, (Object)e);
                    this.lookupCache.invalidate((Object)polled.topic);
                    polled.cb.completeExceptionally((Throwable)new PulsarClientException.LookupException(e.getCause().getMessage()));
                    REQUEST_CREDITS_UPDATER.incrementAndGet(this);
                }
                continue;
            }
            REQUEST_CREDITS_UPDATER.incrementAndGet(this);
        }
    }

    private void invalidateLookupCache(OpRequestSend op) {
        try {
            if (this.lookupCache.get((Object)op.topic) == op.cnx) {
                this.lookupCache.invalidate((Object)op.topic);
            }
        }
        catch (ExecutionException e) {
            this.lookupCache.invalidate((Object)op.topic);
        }
    }

    public CompletableFuture<ClientCnx> getClientCnx(String topic) {
        return ((PulsarClientImpl)this.pulsarClient).getConnection(topic);
    }

    public void close() {
        this.timer.stop();
    }

    public int getAvailableRequestCredits() {
        return REQUEST_CREDITS_UPDATER.get(this);
    }

    public int getPendingRequestsCount() {
        return this.pendingRequests.size();
    }

    public static final class OpRequestSend {
        long requestId;
        String topic;
        ByteBuf cmd;
        CompletableFuture<TxnID> cb;
        long createdAt;
        CompletableFuture<ClientCnx> cnx;
        private final Recycler.Handle<OpRequestSend> recyclerHandle;
        private static final Recycler<OpRequestSend> RECYCLER = new Recycler<OpRequestSend>(){

            protected OpRequestSend newObject(Recycler.Handle<OpRequestSend> handle) {
                return new OpRequestSend(handle);
            }
        };

        static OpRequestSend create(long requestId, String topic, ByteBuf cmd, CompletableFuture<TxnID> cb, CompletableFuture<ClientCnx> cnx) {
            OpRequestSend op = (OpRequestSend)RECYCLER.get();
            op.requestId = requestId;
            op.topic = topic;
            op.cmd = cmd;
            op.cb = cb;
            op.createdAt = System.currentTimeMillis();
            op.cnx = cnx;
            return op;
        }

        void recycle() {
            this.recyclerHandle.recycle((Object)this);
        }

        private OpRequestSend(Recycler.Handle<OpRequestSend> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }
    }
}

