package org.apache.pulsar.compaction;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/compaction/TwoPhaseCompactor.class */
public class TwoPhaseCompactor extends Compactor {
    private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
    private static final int MAX_OUTSTANDING = 500;
    protected static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
    private final Duration phaseOneLoopReadTimeout;
    private final boolean topicCompactionRetainNullKey;

    /* loaded from: input_file:org/apache/pulsar/compaction/TwoPhaseCompactor$PhaseOneResult.class */
    private static class PhaseOneResult {
        final MessageId from;
        final MessageId to;
        final MessageId lastReadId;
        final Map<String, MessageId> latestForKey;

        PhaseOneResult(MessageId messageId, MessageId messageId2, MessageId messageId3, Map<String, MessageId> map) {
            this.from = messageId;
            this.to = messageId2;
            this.lastReadId = messageId3;
            this.latestForKey = map;
        }
    }

    public TwoPhaseCompactor(ServiceConfiguration serviceConfiguration, PulsarClient pulsarClient, BookKeeper bookKeeper, ScheduledExecutorService scheduledExecutorService) {
        super(serviceConfiguration, pulsarClient, bookKeeper, scheduledExecutorService);
        this.phaseOneLoopReadTimeout = Duration.ofSeconds(serviceConfiguration.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
        this.topicCompactionRetainNullKey = serviceConfiguration.isTopicCompactionRetainNullKey();
    }

    @Override // org.apache.pulsar.compaction.Compactor
    protected CompletableFuture<Long> doCompaction(RawReader rawReader, BookKeeper bookKeeper) {
        return rawReader.hasMessageAvailableAsync().thenCompose(bool -> {
            if (bool.booleanValue()) {
                return phaseOne(rawReader).thenCompose(phaseOneResult -> {
                    return phaseTwo(rawReader, phaseOneResult.from, phaseOneResult.to, phaseOneResult.lastReadId, phaseOneResult.latestForKey, bookKeeper);
                });
            }
            log.info("Skip compaction of the empty topic {}", rawReader.getTopic());
            return CompletableFuture.completedFuture(-1L);
        });
    }

    private CompletableFuture<PhaseOneResult> phaseOne(RawReader rawReader) {
        HashMap hashMap = new HashMap();
        CompletableFuture<PhaseOneResult> completableFuture = new CompletableFuture<>();
        rawReader.getLastMessageIdAsync().thenAccept(messageId -> {
            log.info("Commencing phase one of compaction for {}, reading to {}", rawReader.getTopic(), messageId);
            MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
            phaseOneLoop(rawReader, Optional.empty(), Optional.empty(), new MessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), messageIdImpl.getPartitionIndex()), hashMap, completableFuture);
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    private void phaseOneLoop(RawReader rawReader, Optional<MessageId> optional, Optional<MessageId> optional2, MessageId messageId, Map<String, MessageId> map, CompletableFuture<PhaseOneResult> completableFuture) {
        if (completableFuture.isDone()) {
            return;
        }
        CompletableFuture<RawMessage> readNextAsync = rawReader.readNextAsync();
        FutureUtil.addTimeoutHandling(readNextAsync, this.phaseOneLoopReadTimeout, this.scheduler, () -> {
            return FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)");
        });
        readNextAsync.thenAcceptAsync(rawMessage -> {
            try {
                MessageId messageId2 = rawMessage.getMessageId();
                boolean z = false;
                this.mxBean.addCompactionReadOp(rawReader.getTopic(), rawMessage.getHeadersAndPayload().readableBytes());
                MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(rawMessage.getHeadersAndPayload());
                if (Markers.isServerOnlyMarker(parseMessageMetadata)) {
                    this.mxBean.addCompactionRemovedEvent(rawReader.getTopic());
                    r17 = true;
                } else if (RawBatchConverter.isReadableBatch(parseMessageMetadata)) {
                    try {
                        int numMessagesInBatch = parseMessageMetadata.getNumMessagesInBatch();
                        int i = 0;
                        for (ImmutableTriple<MessageId, String, Integer> immutableTriple : RawBatchConverter.extractIdsAndKeysAndSize(rawMessage, true)) {
                            if (immutableTriple != null) {
                                if (immutableTriple.getMiddle() == null) {
                                    if (!this.topicCompactionRetainNullKey) {
                                        i++;
                                        this.mxBean.addCompactionRemovedEvent(rawReader.getTopic());
                                    }
                                } else if (((Integer) immutableTriple.getRight()).intValue() <= 0) {
                                    map.remove(immutableTriple.getMiddle());
                                    i++;
                                    this.mxBean.addCompactionRemovedEvent(rawReader.getTopic());
                                } else if (((MessageId) map.put((String) immutableTriple.getMiddle(), (MessageId) immutableTriple.getLeft())) != null) {
                                    this.mxBean.addCompactionRemovedEvent(rawReader.getTopic());
                                }
                            }
                        }
                        r17 = i == numMessagesInBatch;
                    } catch (IOException e) {
                        log.info("Error decoding batch for message {}. Whole batch will be included in output", messageId2, e);
                    }
                } else {
                    Pair<String, Integer> extractKeyAndSize = extractKeyAndSize(rawMessage);
                    if (extractKeyAndSize != null) {
                        if (((Integer) extractKeyAndSize.getRight()).intValue() > 0) {
                            z = ((MessageId) map.put((String) extractKeyAndSize.getLeft(), messageId2)) != null;
                        } else {
                            r17 = true;
                            map.remove(extractKeyAndSize.getLeft());
                        }
                    } else if (!this.topicCompactionRetainNullKey) {
                        r17 = true;
                    }
                    if (z || r17) {
                        this.mxBean.addCompactionRemovedEvent(rawReader.getTopic());
                    }
                }
                MessageId messageId3 = (MessageId) optional.orElse(r17 ? null : messageId2);
                MessageId messageId4 = r17 ? (MessageId) optional2.orElse(null) : messageId2;
                if (messageId2.compareTo(messageId) == 0) {
                    completableFuture.complete(new PhaseOneResult(messageId3 == null ? messageId2 : messageId3, messageId4 == null ? messageId2 : messageId4, messageId, map));
                } else {
                    phaseOneLoop(rawReader, Optional.ofNullable(messageId3), Optional.ofNullable(messageId4), messageId, map, completableFuture);
                }
            } finally {
                rawMessage.close();
            }
        }, (Executor) this.scheduler).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
    }

    private CompletableFuture<Long> phaseTwo(RawReader rawReader, MessageId messageId, MessageId messageId2, MessageId messageId3, Map<String, MessageId> map, BookKeeper bookKeeper) {
        return createLedger(bookKeeper, LedgerMetadataUtils.buildMetadataForCompactedLedger(rawReader.getTopic(), messageId2.toByteArray())).thenCompose(ledgerHandle -> {
            log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}", new Object[]{rawReader.getTopic(), messageId, messageId2, Integer.valueOf(map.size()), Long.valueOf(ledgerHandle.getId())});
            return phaseTwoSeekThenLoop(rawReader, messageId, messageId2, messageId3, map, bookKeeper, ledgerHandle);
        });
    }

    private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader rawReader, MessageId messageId, MessageId messageId2, MessageId messageId3, Map<String, MessageId> map, BookKeeper bookKeeper, LedgerHandle ledgerHandle) {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        rawReader.seekAsync(messageId).thenCompose(r14 -> {
            Semaphore semaphore = new Semaphore(MAX_OUTSTANDING);
            CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
            phaseTwoLoop(rawReader, messageId2, map, ledgerHandle, semaphore, completableFuture2, MessageId.earliest);
            return completableFuture2;
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r5 -> {
            return closeLedger(ledgerHandle);
        }).thenCompose(r9 -> {
            return rawReader.acknowledgeCumulativeAsync(messageId3, Map.of("CompactedTopicLedger", Long.valueOf(ledgerHandle.getId())));
        }).whenComplete((r92, th) -> {
            if (th != null) {
                deleteLedger(bookKeeper, ledgerHandle).whenComplete((r8, th) -> {
                    if (th != null) {
                        log.warn("Cleanup of ledger {} for failed", ledgerHandle, th);
                    }
                    completableFuture.completeExceptionally(th);
                });
            } else {
                completableFuture.complete(Long.valueOf(ledgerHandle.getId()));
            }
        });
        return completableFuture;
    }

    private void phaseTwoLoop(RawReader rawReader, MessageId messageId, Map<String, MessageId> map, LedgerHandle ledgerHandle, Semaphore semaphore, CompletableFuture<Void> completableFuture, MessageId messageId2) {
        if (completableFuture.isDone()) {
            return;
        }
        rawReader.readNextAsync().thenAcceptAsync(rawMessage -> {
            CompletableFuture<Void> whenComplete;
            if (completableFuture.isDone()) {
                rawMessage.close();
                return;
            }
            if (rawMessage.getMessageId().compareTo(messageId2) <= 0) {
                phaseTwoLoop(rawReader, messageId, map, ledgerHandle, semaphore, completableFuture, messageId2);
                return;
            }
            try {
                MessageId messageId3 = rawMessage.getMessageId();
                Optional<RawMessage> empty = Optional.empty();
                this.mxBean.addCompactionReadOp(rawReader.getTopic(), rawMessage.getHeadersAndPayload().readableBytes());
                MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(rawMessage.getHeadersAndPayload());
                if (Markers.isServerOnlyMarker(parseMessageMetadata)) {
                    empty = Optional.empty();
                } else if (RawBatchConverter.isReadableBatch(parseMessageMetadata)) {
                    try {
                        empty = RawBatchConverter.rebatchMessage(rawMessage, (str, messageId4) -> {
                            return messageId4.equals(map.get(str));
                        }, this.topicCompactionRetainNullKey);
                    } catch (IOException e) {
                        log.info("Error decoding batch for message {}. Whole batch will be included in output", messageId3, e);
                        empty = Optional.of(rawMessage);
                    }
                } else {
                    Pair<String, Integer> extractKeyAndSize = extractKeyAndSize(rawMessage);
                    if (extractKeyAndSize == null) {
                        empty = this.topicCompactionRetainNullKey ? Optional.of(rawMessage) : Optional.empty();
                    } else {
                        MessageId messageId5 = (MessageId) map.get(extractKeyAndSize.getLeft());
                        if (messageId5 != null && messageId5.equals(messageId3)) {
                            if (((Integer) extractKeyAndSize.getRight()).intValue() <= 0) {
                                completableFuture.completeExceptionally(new IllegalArgumentException("Compaction phase found empty record from sorted key-map"));
                            }
                            empty = Optional.of(rawMessage);
                        }
                    }
                }
                if (empty.isPresent()) {
                    RawMessage rawMessage = empty.get();
                    try {
                        try {
                            semaphore.acquire();
                            whenComplete = addToCompactedLedger(ledgerHandle, rawMessage, rawReader.getTopic()).whenComplete((r5, th) -> {
                                semaphore.release();
                                if (th != null) {
                                    completableFuture.completeExceptionally(th);
                                }
                            });
                        } catch (InterruptedException e2) {
                            Thread.currentThread().interrupt();
                            completableFuture.completeExceptionally(e2);
                            if (rawMessage != rawMessage) {
                                rawMessage.close();
                            }
                        }
                        if (messageId.equals(messageId3)) {
                            semaphore.acquire(MAX_OUTSTANDING);
                            whenComplete.whenComplete((r4, th2) -> {
                                if (th2 == null) {
                                    completableFuture.complete(null);
                                }
                            });
                            if (rawMessage != rawMessage) {
                                rawMessage.close();
                            }
                            return;
                        }
                        if (rawMessage != rawMessage) {
                            rawMessage.close();
                        }
                    } catch (Throwable th3) {
                        if (rawMessage != rawMessage) {
                            rawMessage.close();
                        }
                        throw th3;
                    }
                } else if (messageId.equals(messageId3)) {
                    try {
                        semaphore.acquire(MAX_OUTSTANDING);
                        completableFuture.complete(null);
                    } catch (InterruptedException e3) {
                        Thread.currentThread().interrupt();
                        completableFuture.completeExceptionally(e3);
                    }
                    rawMessage.close();
                    return;
                }
                phaseTwoLoop(rawReader, messageId, map, ledgerHandle, semaphore, completableFuture, rawMessage.getMessageId());
                rawMessage.close();
            } finally {
                rawMessage.close();
            }
        }, (Executor) this.scheduler).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<LedgerHandle> createLedger(BookKeeper bookKeeper, Map<String, byte[]> map) {
        CompletableFuture<LedgerHandle> completableFuture = new CompletableFuture<>();
        try {
            bookKeeper.asyncCreateLedger(this.conf.getManagedLedgerDefaultEnsembleSize(), this.conf.getManagedLedgerDefaultWriteQuorum(), this.conf.getManagedLedgerDefaultAckQuorum(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD, (i, ledgerHandle, obj) -> {
                if (i != 0) {
                    completableFuture.completeExceptionally(BKException.create(i));
                } else {
                    completableFuture.complete(ledgerHandle);
                }
            }, (Object) null, map);
            return completableFuture;
        } catch (Throwable th) {
            log.error("Encountered unexpected error when creating compaction ledger", th);
            return FutureUtil.failedFuture(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> deleteLedger(BookKeeper bookKeeper, LedgerHandle ledgerHandle) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            bookKeeper.asyncDeleteLedger(ledgerHandle.getId(), (i, obj) -> {
                if (i != 0) {
                    completableFuture.completeExceptionally(BKException.create(i));
                } else {
                    completableFuture.complete(null);
                }
            }, (Object) null);
            return completableFuture;
        } catch (Throwable th) {
            return FutureUtil.failedFuture(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            ledgerHandle.asyncClose((i, ledgerHandle2, obj) -> {
                if (i != 0) {
                    completableFuture.completeExceptionally(BKException.create(i));
                } else {
                    completableFuture.complete(null);
                }
            }, (Object) null);
            return completableFuture;
        } catch (Throwable th) {
            return FutureUtil.failedFuture(th);
        }
    }

    private CompletableFuture<Void> addToCompactedLedger(LedgerHandle ledgerHandle, RawMessage rawMessage, String str) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ByteBuf serialize = rawMessage.serialize();
        try {
            this.mxBean.addCompactionWriteOp(str, rawMessage.getHeadersAndPayload().readableBytes());
            long nanoTime = System.nanoTime();
            ledgerHandle.asyncAddEntry(serialize, (i, ledgerHandle2, j, obj) -> {
                this.mxBean.addCompactionLatencyOp(str, System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                if (i != 0) {
                    completableFuture.completeExceptionally(BKException.create(i));
                } else {
                    completableFuture.complete(null);
                }
            }, (Object) null);
            return completableFuture;
        } catch (Throwable th) {
            return FutureUtil.failedFuture(th);
        }
    }

    private static Pair<String, Integer> extractKeyAndSize(RawMessage rawMessage) {
        ByteBuf headersAndPayload = rawMessage.getHeadersAndPayload();
        MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(headersAndPayload);
        if (!parseMessageMetadata.hasPartitionKey()) {
            return null;
        }
        int readableBytes = headersAndPayload.readableBytes();
        if (parseMessageMetadata.hasUncompressedSize()) {
            readableBytes = parseMessageMetadata.getUncompressedSize();
        }
        return Pair.of(parseMessageMetadata.getPartitionKey(), Integer.valueOf(readableBytes));
    }

    public long getPhaseOneLoopReadTimeoutInSeconds() {
        return this.phaseOneLoopReadTimeout.getSeconds();
    }
}
