package org.apache.pulsar.broker.delayed.bucket;

import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import javax.validation.constraints.NotNull;
import lombok.Generated;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.class */
public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
    private static final byte[] LedgerPassword = "".getBytes();
    private final PulsarService pulsar;
    private final ServiceConfiguration config;
    private BookKeeper bookKeeper;
    private final Map<Long, CompletableFuture<LedgerHandle>> ledgerHandleFutureCache = new ConcurrentHashMap();

    public BookkeeperBucketSnapshotStorage(PulsarService pulsarService) {
        this.pulsar = pulsarService;
        this.config = pulsarService.getConfig();
    }

    @Override // org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage
    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata, List<SnapshotSegment> list, String str, String str2, String str3) {
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(snapshotMetadata.toByteArray());
        return createLedger(str, str2, str3).thenCompose(ledgerHandle -> {
            return addEntry(ledgerHandle, wrappedBuffer).thenCompose(r7 -> {
                return addSnapshotSegments(ledgerHandle, list);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r5 -> {
                return closeLedger(ledgerHandle);
            }).thenApply(r4 -> {
                return Long.valueOf(ledgerHandle.getId());
            });
        });
    }

    @Override // org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage
    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long j) {
        return getLedgerHandle(Long.valueOf(j)).thenCompose(ledgerHandle -> {
            return getLedgerEntry(ledgerHandle, 0L, 0L).thenApply(enumeration -> {
                return parseSnapshotMetadataEntry((LedgerEntry) enumeration.nextElement());
            });
        });
    }

    @Override // org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage
    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long j, long j2, long j3) {
        return getLedgerHandle(Long.valueOf(j)).thenCompose(ledgerHandle -> {
            return getLedgerEntry(ledgerHandle, j2, j3).thenApply(this::parseSnapshotSegmentEntries);
        });
    }

    @Override // org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage
    public CompletableFuture<Long> getBucketSnapshotLength(long j) {
        return getLedgerHandle(Long.valueOf(j)).thenCompose(ledgerHandle -> {
            return CompletableFuture.completedFuture(Long.valueOf(ledgerHandle.getLength()));
        });
    }

    @Override // org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage
    public CompletableFuture<Void> deleteBucketSnapshot(long j) {
        CompletableFuture<LedgerHandle> remove = this.ledgerHandleFutureCache.remove(Long.valueOf(j));
        if (remove != null) {
            remove.whenComplete((ledgerHandle, th) -> {
                closeLedger(ledgerHandle);
            });
        }
        return deleteLedger(j);
    }

    @Override // org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage
    public void start() throws Exception {
        this.bookKeeper = this.pulsar.getBookKeeperClientFactory().create(this.pulsar.getConfiguration(), this.pulsar.getLocalMetadataStore(), this.pulsar.getIoEventLoopGroup(), Optional.empty(), null).get();
    }

    @Override // org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage
    public void close() throws Exception {
        if (this.bookKeeper != null) {
            this.bookKeeper.close();
        }
    }

    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle, List<SnapshotSegment> list) {
        ArrayList arrayList = new ArrayList();
        for (SnapshotSegment snapshotSegment : list) {
            ByteBuf directBuffer = PulsarByteBufAllocator.DEFAULT.directBuffer(snapshotSegment.getSerializedSize());
            try {
                snapshotSegment.writeTo(directBuffer);
                arrayList.add(addEntry(ledgerHandle, directBuffer));
            } catch (Exception e) {
                directBuffer.release();
                throw e;
            }
        }
        return FutureUtil.waitForAll(arrayList);
    }

    private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
        ByteBuf byteBuf = null;
        try {
            try {
                byteBuf = ledgerEntry.getEntryBuffer();
                SnapshotMetadata parseFrom = SnapshotMetadata.parseFrom(byteBuf.nioBuffer());
                if (byteBuf != null) {
                    byteBuf.release();
                }
                return parseFrom;
            } catch (InvalidProtocolBufferException e) {
                throw new BucketSnapshotSerializationException((Throwable) e);
            }
        } catch (Throwable th) {
            if (byteBuf != null) {
                byteBuf.release();
            }
            throw th;
        }
    }

    private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntry> enumeration) {
        ArrayList arrayList = new ArrayList();
        while (enumeration.hasMoreElements()) {
            LedgerEntry nextElement = enumeration.nextElement();
            SnapshotSegment snapshotSegment = new SnapshotSegment();
            ByteBuf entryBuffer = nextElement.getEntryBuffer();
            try {
                snapshotSegment.parseFrom(entryBuffer, entryBuffer.readableBytes());
                entryBuffer.release();
                arrayList.add(snapshotSegment);
            } catch (Throwable th) {
                entryBuffer.release();
                throw th;
            }
        }
        return arrayList;
    }

    @NotNull
    private CompletableFuture<LedgerHandle> createLedger(String str, String str2, String str3) {
        CompletableFuture<LedgerHandle> completableFuture = new CompletableFuture<>();
        this.bookKeeper.asyncCreateLedger(this.config.getManagedLedgerDefaultEnsembleSize(), this.config.getManagedLedgerDefaultWriteQuorum(), this.config.getManagedLedgerDefaultAckQuorum(), BookKeeper.DigestType.fromApiDigestType(this.config.getManagedLedgerDigestType()), LedgerPassword, (i, ledgerHandle, obj) -> {
            if (i != 0) {
                completableFuture.completeExceptionally(bkException("Create ledger", i, -1L));
            } else {
                completableFuture.complete(ledgerHandle);
            }
        }, (Object) null, LedgerMetadataUtils.buildMetadataForDelayedIndexBucket(str, str2, str3));
        return completableFuture;
    }

    private CompletableFuture<LedgerHandle> getLedgerHandle(Long l) {
        CompletableFuture<LedgerHandle> computeIfAbsent = this.ledgerHandleFutureCache.computeIfAbsent(l, l2 -> {
            return openLedger(l);
        });
        computeIfAbsent.whenComplete((ledgerHandle, th) -> {
            if (th != null) {
                this.ledgerHandleFutureCache.remove(l, computeIfAbsent);
            }
        });
        return computeIfAbsent;
    }

    private CompletableFuture<LedgerHandle> openLedger(Long l) {
        CompletableFuture<LedgerHandle> completableFuture = new CompletableFuture<>();
        this.bookKeeper.asyncOpenLedger(l.longValue(), BookKeeper.DigestType.fromApiDigestType(this.config.getManagedLedgerDigestType()), LedgerPassword, (i, ledgerHandle, obj) -> {
            if (i == -7) {
                completableFuture.completeExceptionally(noSuchLedgerException("Open ledger", l.longValue()));
            } else if (i != 0) {
                completableFuture.completeExceptionally(bkException("Open ledger", i, l.longValue()));
            } else {
                completableFuture.complete(ledgerHandle);
            }
        }, (Object) null);
        return completableFuture;
    }

    private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ledgerHandle.asyncClose((i, ledgerHandle2, obj) -> {
            if (i == 0) {
                completableFuture.complete(null);
            } else {
                log.warn("Failed to close a Ledger Handle: {}", Long.valueOf(ledgerHandle.getId()));
                completableFuture.completeExceptionally(bkException("Close ledger", i, ledgerHandle.getId()));
            }
        }, (Object) null);
        return completableFuture;
    }

    private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, ByteBuf byteBuf) {
        CompletableFuture completableFuture = new CompletableFuture();
        ledgerHandle.asyncAddEntry(byteBuf, (i, ledgerHandle2, j, obj) -> {
            if (i != 0) {
                completableFuture.completeExceptionally(bkException("Add entry", i, ledgerHandle.getId()));
            } else {
                completableFuture.complete(null);
            }
        }, (Object) null);
        return completableFuture.whenComplete((r6, th) -> {
            if (th != null) {
                deleteLedger(ledgerHandle.getId());
            }
        });
    }

    CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntry(LedgerHandle ledgerHandle, long j, long j2) {
        CompletableFuture<Enumeration<LedgerEntry>> completableFuture = new CompletableFuture<>();
        ledgerHandle.asyncReadEntries(j, j2, (i, ledgerHandle2, enumeration, obj) -> {
            if (i != 0) {
                completableFuture.completeExceptionally(bkException("Read entry", i, ledgerHandle.getId()));
            } else {
                completableFuture.complete(enumeration);
            }
        }, (Object) null);
        return completableFuture;
    }

    private CompletableFuture<Void> deleteLedger(long j) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.bookKeeper.asyncDeleteLedger(j, (i, obj) -> {
            if (i == -7 || i == 0) {
                completableFuture.complete(null);
            } else {
                completableFuture.completeExceptionally(bkException("Delete ledger", i, j));
            }
        }, (Object) null);
        return completableFuture;
    }

    private static BucketSnapshotPersistenceException bkException(String str, int i, long j) {
        String message = BKException.getMessage(i);
        return new BucketSnapshotPersistenceException(message + " -  ledger=" + j + " - operation=" + message);
    }

    private static BucketNotExistException noSuchLedgerException(String str, long j) {
        String message = BKException.getMessage(-7);
        return new BucketNotExistException(message + " - ledger=" + j + " - operation=" + message);
    }
}
