package org.apache.pulsar.compaction;

import io.netty.buffer.ByteBuf;
import java.time.Duration;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.CompactionReaderImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.class */
public class StrategicTwoPhaseCompactor extends PublishingOrderCompactor {
    private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
    private static final int MAX_OUTSTANDING = 500;
    private static final int MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS = 20000;
    private final Duration phaseOneLoopReadTimeout;
    private final RawBatchMessageContainerImpl batchMessageContainer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/compaction/StrategicTwoPhaseCompactor$PhaseOneResult.class */
    public static class PhaseOneResult<T> {
        MessageId firstId;
        MessageId lastId;
        Map<String, Message<T>> cache = new LinkedHashMap();
        AtomicInteger invalidCompactionCount = new AtomicInteger();
        AtomicInteger validCompactionCount = new AtomicInteger();
        AtomicInteger numReadMessages = new AtomicInteger();
        String topic;

        PhaseOneResult(String str) {
            this.topic = str;
        }

        public String toString() {
            Object[] objArr = new Object[7];
            objArr[0] = this.topic;
            objArr[1] = this.firstId != null ? this.firstId.toString() : "";
            objArr[2] = this.lastId != null ? this.lastId.toString() : "";
            objArr[3] = Integer.valueOf(this.cache.size());
            objArr[4] = Integer.valueOf(this.invalidCompactionCount.get());
            objArr[5] = Integer.valueOf(this.validCompactionCount.get());
            objArr[6] = Integer.valueOf(this.numReadMessages.get());
            return String.format("{Topic:%s, firstId:%s, lastId:%s, cache.size:%d, invalidCompactionCount:%d, validCompactionCount:%d, numReadMessages:%d}", objArr);
        }
    }

    public StrategicTwoPhaseCompactor(ServiceConfiguration serviceConfiguration, PulsarClient pulsarClient, BookKeeper bookKeeper, ScheduledExecutorService scheduledExecutorService) {
        super(serviceConfiguration, pulsarClient, bookKeeper, scheduledExecutorService);
        this.batchMessageContainer = new RawBatchMessageContainerImpl();
        this.phaseOneLoopReadTimeout = Duration.ofSeconds(serviceConfiguration.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
    }

    @Override // org.apache.pulsar.compaction.Compactor
    public CompletableFuture<Long> compact(String str) {
        throw new UnsupportedOperationException();
    }

    public <T> CompletableFuture<Long> compact(String str, TopicCompactionStrategy<T> topicCompactionStrategy) {
        return compact(str, topicCompactionStrategy, null);
    }

    public <T> CompletableFuture<Long> compact(String str, TopicCompactionStrategy<T> topicCompactionStrategy, CryptoKeyReader cryptoKeyReader) {
        CompletableFuture completableFuture = new CompletableFuture();
        if (cryptoKeyReader != null) {
            this.batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
        }
        CompactionReaderImpl create = CompactionReaderImpl.create(this.pulsar, topicCompactionStrategy.getSchema(), str, completableFuture, cryptoKeyReader);
        return completableFuture.thenComposeAsync(consumer -> {
            return compactAndCloseReader(create, topicCompactionStrategy);
        }, (Executor) this.scheduler);
    }

    <T> CompletableFuture<Long> doCompaction(Reader<T> reader, TopicCompactionStrategy topicCompactionStrategy) {
        return !(reader instanceof CompactionReaderImpl) ? CompletableFuture.failedFuture(new IllegalStateException("reader has to be CompactionReaderImpl")) : reader.hasMessageAvailableAsync().thenCompose(bool -> {
            if (bool.booleanValue()) {
                return phaseOne(reader, topicCompactionStrategy).thenCompose(phaseOneResult -> {
                    return phaseTwo(phaseOneResult, reader, this.bk);
                });
            }
            log.info("Skip compaction of the empty topic {}", reader.getTopic());
            return CompletableFuture.completedFuture(-1L);
        });
    }

    <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader, TopicCompactionStrategy topicCompactionStrategy) {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        this.mxBean.addCompactionStartOp(reader.getTopic());
        doCompaction(reader, topicCompactionStrategy).whenComplete((l, th) -> {
            log.info("Completed doCompaction ledgerId:{}", l);
            reader.closeAsync().whenComplete((r10, th) -> {
                if (th != null) {
                    log.warn("Error closing reader handle {}, ignoring", reader, th);
                }
                if (th != null) {
                    this.mxBean.addCompactionEndOp(reader.getTopic(), false);
                    completableFuture.completeExceptionally(th);
                } else {
                    this.mxBean.addCompactionEndOp(reader.getTopic(), true);
                    completableFuture.complete(l);
                }
            });
        });
        return completableFuture;
    }

    private <T> boolean doCompactMessage(Message<T> message, PhaseOneResult<T> phaseOneResult, TopicCompactionStrategy<T> topicCompactionStrategy) {
        Map<String, Message<T>> map = phaseOneResult.cache;
        String key = message.getKey();
        if (key == null) {
            message.release();
            return true;
        }
        Object value = message.getValue();
        Message<T> message2 = map.get(key);
        if (topicCompactionStrategy.shouldKeepLeft(message2 == null ? null : message2.getValue(), value)) {
            message.release();
            phaseOneResult.invalidCompactionCount.incrementAndGet();
            return false;
        }
        if (value == null || message.size() <= 0) {
            map.remove(key);
            message.release();
        } else {
            map.remove(key);
            map.put(key, message);
        }
        if (message2 != null) {
            message2.release();
        }
        phaseOneResult.validCompactionCount.incrementAndGet();
        return true;
    }

    private <T> CompletableFuture<PhaseOneResult> phaseOne(Reader<T> reader, TopicCompactionStrategy topicCompactionStrategy) {
        CompletableFuture<PhaseOneResult> completableFuture = new CompletableFuture<>();
        PhaseOneResult phaseOneResult = new PhaseOneResult(reader.getTopic());
        ((CompactionReaderImpl) reader).getLastMessageIdAsync().thenAccept(messageId -> {
            log.info("Commencing phase one of compaction for {}, reading to {}", reader.getTopic(), messageId);
            phaseOneResult.lastId = copyMessageId(messageId);
            phaseOneLoop(reader, completableFuture, phaseOneResult, topicCompactionStrategy);
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    private static MessageId copyMessageId(MessageId messageId) {
        if (messageId instanceof BatchMessageIdImpl) {
            return new BatchMessageIdImpl((BatchMessageIdImpl) messageId);
        }
        if (!(messageId instanceof MessageIdImpl)) {
            throw new IllegalStateException("Unknown lastMessageId type");
        }
        MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
        return new MessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), messageIdImpl.getPartitionIndex());
    }

    private <T> void phaseOneLoop(Reader<T> reader, CompletableFuture<PhaseOneResult> completableFuture, PhaseOneResult<T> phaseOneResult, TopicCompactionStrategy<T> topicCompactionStrategy) {
        if (completableFuture.isDone()) {
            return;
        }
        CompletableFuture readNextAsync = reader.readNextAsync();
        FutureUtil.addTimeoutHandling(readNextAsync, this.phaseOneLoopReadTimeout, this.scheduler, () -> {
            return FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)");
        });
        readNextAsync.thenAcceptAsync((Consumer) message -> {
            MessageId messageId = message.getMessageId();
            boolean z = false;
            if (phaseOneResult.lastId.compareTo(messageId) == 0) {
                z = true;
            }
            phaseOneResult.numReadMessages.incrementAndGet();
            this.mxBean.addCompactionReadOp(reader.getTopic(), message.size());
            if (doCompactMessage(message, phaseOneResult, topicCompactionStrategy)) {
                this.mxBean.addCompactionRemovedEvent(reader.getTopic());
            }
            if (phaseOneResult.firstId == null) {
                phaseOneResult.firstId = copyMessageId(messageId);
                log.info("Resetting cursor to firstId:{}", phaseOneResult.firstId);
                try {
                    reader.seek(phaseOneResult.firstId);
                    waitForReconnection(reader);
                } catch (Throwable th) {
                    throw new RuntimeException(String.format("Failed while resetting the cursor to firstId:%s", phaseOneResult.firstId), th);
                }
            }
            if (z) {
                completableFuture.complete(phaseOneResult);
            } else {
                phaseOneLoop(reader, completableFuture, phaseOneResult, topicCompactionStrategy);
            }
        }, (Executor) this.scheduler).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
    }

    private <T> void waitForReconnection(Reader<T> reader) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
        }
        while (!reader.isConnected()) {
            long currentTimeMillis2 = System.currentTimeMillis();
            if (currentTimeMillis2 - currentTimeMillis > 20000) {
                String format = String.format("Reader has not been reconnected for %d secs. Stopping the compaction.", 20);
                log.error(format);
                throw new RuntimeException(format);
            }
            log.warn("Reader has not been reconnected after the cursor reset. elapsed :{} ms. Retrying soon.", Long.valueOf(currentTimeMillis2 - currentTimeMillis));
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e2) {
                log.warn("The thread got interrupted while waiting. continuing", e2);
            }
        }
    }

    private <T> CompletableFuture<Long> phaseTwo(PhaseOneResult<T> phaseOneResult, Reader<T> reader, BookKeeper bookKeeper) {
        log.info("Completed phase one. Result:{}. ", phaseOneResult);
        return createLedger(bookKeeper, LedgerMetadataUtils.buildMetadataForCompactedLedger(phaseOneResult.topic, phaseOneResult.lastId.toByteArray())).thenCompose(ledgerHandle -> {
            log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}", new Object[]{phaseOneResult.topic, phaseOneResult.firstId, phaseOneResult.lastId, Integer.valueOf(phaseOneResult.cache.size()), Long.valueOf(ledgerHandle.getId())});
            return runPhaseTwo(phaseOneResult, reader, ledgerHandle, bookKeeper);
        });
    }

    private <T> CompletableFuture<Long> runPhaseTwo(PhaseOneResult<T> phaseOneResult, Reader<T> reader, LedgerHandle ledgerHandle, BookKeeper bookKeeper) {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        Semaphore semaphore = new Semaphore(MAX_OUTSTANDING);
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        phaseTwoLoop(phaseOneResult.topic, phaseOneResult.cache.values().iterator(), ledgerHandle, semaphore, completableFuture2);
        completableFuture2.thenCompose(r11 -> {
            log.info("Flushing batch container numMessagesInBatch:{}", Integer.valueOf(this.batchMessageContainer.getNumMessagesInBatch()));
            return addToCompactedLedger(ledgerHandle, null, reader.getTopic(), semaphore).whenComplete((bool, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                }
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) bool -> {
            log.info("Acking ledger id {}", phaseOneResult.lastId);
            return ((CompactionReaderImpl) reader).acknowledgeCumulativeAsync(phaseOneResult.lastId, Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, Long.valueOf(ledgerHandle.getId())));
        }).thenCompose(r5 -> {
            return closeLedger(ledgerHandle);
        }).whenComplete((BiConsumer) (r9, th) -> {
            if (th != null) {
                deleteLedger(bookKeeper, ledgerHandle).whenComplete((r8, th) -> {
                    if (th != null) {
                        log.error("Cleanup of ledger {} for failed", ledgerHandle, th);
                    }
                    completableFuture.completeExceptionally(th);
                });
            } else {
                log.info("kept ledger:{}", Long.valueOf(ledgerHandle.getId()));
                completableFuture.complete(Long.valueOf(ledgerHandle.getId()));
            }
        });
        return completableFuture;
    }

    private <T> void phaseTwoLoop(String str, Iterator<Message<T>> it, LedgerHandle ledgerHandle, Semaphore semaphore, CompletableFuture<Void> completableFuture) {
        if (completableFuture.isDone()) {
            return;
        }
        CompletableFuture.runAsync(() -> {
            if (it.hasNext()) {
                Message message = (Message) it.next();
                this.mxBean.addCompactionReadOp(str, message.size());
                addToCompactedLedger(ledgerHandle, message, str, semaphore).whenComplete((bool, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    }
                });
                phaseTwoLoop(str, it, ledgerHandle, semaphore, completableFuture);
                return;
            }
            try {
                semaphore.acquire(MAX_OUTSTANDING);
                semaphore.release(MAX_OUTSTANDING);
                completableFuture.complete(null);
            } catch (InterruptedException e) {
                completableFuture.completeExceptionally(e);
            }
        }, this.scheduler).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
    }

    <T> CompletableFuture<Boolean> addToCompactedLedger(LedgerHandle ledgerHandle, Message<T> message, String str, Semaphore semaphore) {
        if (message == null) {
            return flushBatchMessage(ledgerHandle, str, semaphore);
        }
        if (this.batchMessageContainer.haveEnoughSpace((MessageImpl) message)) {
            this.batchMessageContainer.add((MessageImpl) message, null);
            return CompletableFuture.completedFuture(false);
        }
        CompletableFuture<Boolean> flushBatchMessage = flushBatchMessage(ledgerHandle, str, semaphore);
        this.batchMessageContainer.add((MessageImpl) message, null);
        return flushBatchMessage;
    }

    private CompletableFuture<Boolean> flushBatchMessage(LedgerHandle ledgerHandle, String str, Semaphore semaphore) {
        if (this.batchMessageContainer.getNumMessagesInBatch() <= 0) {
            return CompletableFuture.completedFuture(false);
        }
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        try {
            ByteBuf byteBuf = this.batchMessageContainer.toByteBuf();
            semaphore.acquire();
            this.mxBean.addCompactionWriteOp(str, byteBuf.readableBytes());
            long nanoTime = System.nanoTime();
            ledgerHandle.asyncAddEntry(byteBuf, (i, ledgerHandle2, j, obj) -> {
                semaphore.release();
                this.mxBean.addCompactionLatencyOp(str, System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                if (i != 0) {
                    completableFuture.completeExceptionally(BKException.create(i));
                } else {
                    completableFuture.complete(true);
                }
            }, (Object) null);
            return completableFuture;
        } catch (Throwable th) {
            log.error("Failed to add entry", th);
            this.batchMessageContainer.discard((Exception) th);
            completableFuture.completeExceptionally(th);
            return completableFuture;
        }
    }
}
