package org.apache.kafka.streams.processor.internals;

import io.vertx.core.net.impl.KeyStoreHelper;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.FixedOrderMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.class */
public class GlobalStateManagerImpl implements GlobalStateManager {
    private static final long NO_DEADLINE = -1;
    private final Logger log;
    private final Time time;
    private final Consumer<byte[], byte[]> globalConsumer;
    private final File baseDir;
    private final StateDirectory stateDirectory;
    private final StateRestoreListener stateRestoreListener;
    private InternalProcessorContext globalProcessorContext;
    private final Duration pollMsPlusRequestTimeout;
    private final long taskTimeoutMs;
    private final OffsetCheckpoint checkpointFile;
    private final Map<String, String> storeToChangelogTopic;
    private final List<StateStore> globalStateStores;
    private final Set<String> globalStoreNames = new HashSet();
    private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap<>();
    private final Set<String> globalNonPersistentStoresTopics = new HashSet();
    private final Map<TopicPartition, Long> checkpointFileCache = new HashMap();

    public GlobalStateManagerImpl(LogContext logContext, Time time, ProcessorTopology processorTopology, Consumer<byte[], byte[]> consumer, StateDirectory stateDirectory, StateRestoreListener stateRestoreListener, StreamsConfig streamsConfig) {
        this.time = time;
        this.storeToChangelogTopic = processorTopology.storeToChangelogTopic();
        this.globalStateStores = processorTopology.globalStateStores();
        this.baseDir = stateDirectory.globalStateDir();
        this.checkpointFile = new OffsetCheckpoint(new File(this.baseDir, ".checkpoint"));
        for (StateStore stateStore : this.globalStateStores) {
            if (!stateStore.persistent()) {
                this.globalNonPersistentStoresTopics.add(changelogFor(stateStore.name()));
            }
        }
        this.log = logContext.logger(GlobalStateManagerImpl.class);
        this.globalConsumer = consumer;
        this.stateDirectory = stateDirectory;
        this.stateRestoreListener = stateRestoreListener;
        Map<String, Object> globalConsumerConfigs = streamsConfig.getGlobalConsumerConfigs(KeyStoreHelper.DUMMY_PASSWORD);
        globalConsumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        globalConsumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        this.pollMsPlusRequestTimeout = Duration.ofMillis(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG).longValue() + new ClientUtils.QuietConsumerConfig(globalConsumerConfigs).getInt("request.timeout.ms").intValue());
        this.taskTimeoutMs = streamsConfig.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG).longValue();
    }

    @Override // org.apache.kafka.streams.processor.internals.GlobalStateManager
    public void setGlobalProcessorContext(InternalProcessorContext internalProcessorContext) {
        this.globalProcessorContext = internalProcessorContext;
    }

    @Override // org.apache.kafka.streams.processor.internals.GlobalStateManager
    public Set<String> initialize() {
        try {
            if (!this.stateDirectory.lockGlobalState()) {
                throw new LockException(String.format("Failed to lock the global state directory: %s", this.baseDir));
            }
            try {
                this.checkpointFileCache.putAll(this.checkpointFile.read());
                HashSet hashSet = new HashSet();
                for (StateStore stateStore : this.globalStateStores) {
                    this.globalStoreNames.add(stateStore.name());
                    hashSet.add(this.storeToChangelogTopic.get(stateStore.name()));
                    stateStore.init((StateStoreContext) this.globalProcessorContext, stateStore);
                }
                this.checkpointFileCache.keySet().forEach(topicPartition -> {
                    if (hashSet.contains(topicPartition.topic())) {
                        return;
                    }
                    this.log.error("Encountered a topic-partition in the global checkpoint file not associated with any global state store, topic-partition: {}, checkpoint file: {}. If this topic-partition is no longer valid, an application reset and state store directory cleanup will be required.", topicPartition.topic(), this.checkpointFile.toString());
                    try {
                        this.stateDirectory.unlockGlobalState();
                    } catch (IOException e) {
                        this.log.error("Failed to unlock the global state directory", (Throwable) e);
                    }
                    throw new StreamsException("Encountered a topic-partition not associated with any global state store");
                });
                return Collections.unmodifiableSet(this.globalStoreNames);
            } catch (IOException e) {
                try {
                    this.stateDirectory.unlockGlobalState();
                } catch (IOException e2) {
                    this.log.error("Failed to unlock the global state directory", (Throwable) e);
                }
                throw new StreamsException("Failed to read checkpoints for global state globalStores", e);
            }
        } catch (IOException e3) {
            throw new LockException(String.format("Failed to lock the global state directory: %s", this.baseDir), e3);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getGlobalStore(String str) {
        return this.globalStores.getOrDefault(str, Optional.empty()).orElse(null);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getStore(String str) {
        return getGlobalStore(str);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public File baseDir() {
        return this.baseDir;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void registerStore(StateStore stateStore, StateRestoreCallback stateRestoreCallback) {
        if (this.globalStores.containsKey(stateStore.name())) {
            throw new IllegalArgumentException(String.format("Global Store %s has already been registered", stateStore.name()));
        }
        if (!this.globalStoreNames.contains(stateStore.name())) {
            throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", stateStore.name()));
        }
        if (stateRestoreCallback == null) {
            throw new IllegalArgumentException(String.format("The stateRestoreCallback provided for store %s was null", stateStore.name()));
        }
        this.log.info("Restoring state for global store {}", stateStore.name());
        List<TopicPartition> list = topicPartitionsForStore(stateStore);
        try {
            restoreState(stateRestoreCallback, list, (Map) retryUntilSuccessOrThrowOnTaskTimeout(() -> {
                return this.globalConsumer.endOffsets(list);
            }, String.format("Failed to get offsets for partitions %s. The broker may be transiently unavailable at the moment.", list)), stateStore.name(), StateManagerUtil.converterForStore(stateStore));
            this.globalStores.put(stateStore.name(), Optional.of(stateStore));
            this.globalConsumer.unsubscribe();
        } catch (Throwable th) {
            this.globalConsumer.unsubscribe();
            throw th;
        }
    }

    private List<TopicPartition> topicPartitionsForStore(StateStore stateStore) {
        String str = this.storeToChangelogTopic.get(stateStore.name());
        List<PartitionInfo> list = (List) retryUntilSuccessOrThrowOnTaskTimeout(() -> {
            return this.globalConsumer.partitionsFor(str);
        }, String.format("Failed to get partitions for topic %s. The broker may be transiently unavailable at the moment.", str));
        if (list == null || list.isEmpty()) {
            throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", str, stateStore.name()));
        }
        ArrayList arrayList = new ArrayList();
        for (PartitionInfo partitionInfo : list) {
            arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        return arrayList;
    }

    private void restoreState(StateRestoreCallback stateRestoreCallback, List<TopicPartition> list, Map<TopicPartition, Long> map, String str, RecordConverter recordConverter) {
        long globalConsumerOffset;
        long j;
        for (TopicPartition topicPartition : list) {
            long j2 = -1;
            this.globalConsumer.assign(Collections.singletonList(topicPartition));
            Long l = this.checkpointFileCache.get(topicPartition);
            if (l != null) {
                this.globalConsumer.seek(topicPartition, l.longValue());
                globalConsumerOffset = l.longValue();
            } else {
                this.globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
                globalConsumerOffset = getGlobalConsumerOffset(topicPartition);
            }
            Long l2 = map.get(topicPartition);
            RecordBatchingStateRestoreCallback adapt = StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
            this.stateRestoreListener.onRestoreStart(topicPartition, str, globalConsumerOffset, l2.longValue());
            long j3 = 0;
            while (true) {
                j = j3;
                if (globalConsumerOffset < l2.longValue()) {
                    ConsumerRecords<byte[], byte[]> poll = this.globalConsumer.poll(this.pollMsPlusRequestTimeout);
                    j2 = poll.isEmpty() ? maybeUpdateDeadlineOrThrow(j2) : -1L;
                    ArrayList arrayList = new ArrayList();
                    for (ConsumerRecord<byte[], byte[]> consumerRecord : poll.records(topicPartition)) {
                        if (consumerRecord.key() != null) {
                            arrayList.add(recordConverter.convert(consumerRecord));
                        }
                    }
                    globalConsumerOffset = getGlobalConsumerOffset(topicPartition);
                    adapt.restoreBatch(arrayList);
                    this.stateRestoreListener.onBatchRestored(topicPartition, str, globalConsumerOffset, arrayList.size());
                    j3 = j + arrayList.size();
                }
            }
            this.stateRestoreListener.onRestoreEnd(topicPartition, str, j);
            this.checkpointFileCache.put(topicPartition, Long.valueOf(globalConsumerOffset));
        }
    }

    private long getGlobalConsumerOffset(TopicPartition topicPartition) {
        return ((Long) retryUntilSuccessOrThrowOnTaskTimeout(() -> {
            return Long.valueOf(this.globalConsumer.position(topicPartition));
        }, String.format("Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", topicPartition))).longValue();
    }

    private <R> R retryUntilSuccessOrThrowOnTaskTimeout(Supplier<R> supplier, String str) {
        long j = -1;
        while (true) {
            try {
                return supplier.get();
            } catch (TimeoutException e) {
                if (this.taskTimeoutMs == 0) {
                    throw new StreamsException(String.format("Retrying is disabled. You can enable it by setting `%s` to a value larger than zero.", StreamsConfig.TASK_TIMEOUT_MS_CONFIG), e);
                }
                j = maybeUpdateDeadlineOrThrow(j);
                this.log.warn(str, (Throwable) e);
            }
        }
    }

    private long maybeUpdateDeadlineOrThrow(long j) {
        long milliseconds = this.time.milliseconds();
        if (j != -1) {
            if (milliseconds >= j) {
                throw new TimeoutException(String.format("Global task did not make progress to restore state within %d ms. Adjust `%s` if needed.", Long.valueOf((milliseconds - j) + this.taskTimeoutMs), StreamsConfig.TASK_TIMEOUT_MS_CONFIG));
            }
            return j;
        }
        long j2 = milliseconds + this.taskTimeoutMs;
        if (j2 < 0) {
            return Long.MAX_VALUE;
        }
        return j2;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void flush() {
        this.log.debug("Flushing all global globalStores registered in the state manager");
        for (Map.Entry<String, Optional<StateStore>> entry : this.globalStores.entrySet()) {
            if (!entry.getValue().isPresent()) {
                throw new IllegalStateException("Expected " + entry.getKey() + " to have been initialized");
            }
            StateStore stateStore = entry.getValue().get();
            try {
                this.log.trace("Flushing global store={}", stateStore.name());
                stateStore.flush();
            } catch (RuntimeException e) {
                throw new ProcessorStateException(String.format("Failed to flush global state store %s", stateStore.name()), e);
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void close() throws IOException {
        try {
            if (this.globalStores.isEmpty()) {
                return;
            }
            StringBuilder sb = new StringBuilder();
            for (Map.Entry<String, Optional<StateStore>> entry : this.globalStores.entrySet()) {
                if (entry.getValue().isPresent()) {
                    this.log.debug("Closing global storage engine {}", entry.getKey());
                    try {
                        entry.getValue().get().close();
                    } catch (RuntimeException e) {
                        this.log.error("Failed to close global state store {}", entry.getKey(), e);
                        sb.append("Failed to close global state store:").append(entry.getKey()).append(". Reason: ").append(e).append("\n");
                    }
                    this.globalStores.put(entry.getKey(), Optional.empty());
                } else {
                    this.log.info("Skipping to close non-initialized store {}", entry.getKey());
                }
            }
            if (sb.length() > 0) {
                throw new ProcessorStateException("Exceptions caught during close of 1 or more global state globalStores\n" + ((Object) sb));
            }
            this.stateDirectory.unlockGlobalState();
        } finally {
            this.stateDirectory.unlockGlobalState();
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void updateChangelogOffsets(Map<TopicPartition, Long> map) {
        this.checkpointFileCache.putAll(map);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void checkpoint() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : this.checkpointFileCache.entrySet()) {
            if (!this.globalNonPersistentStoresTopics.contains(entry.getKey().topic())) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        try {
            this.checkpointFile.write(hashMap);
        } catch (IOException e) {
            this.log.warn("Failed to write offset checkpoint file to {} for global stores: {}. This may occur if OS cleaned the state.dir in case when it is located in the (default) ${java.io.tmpdir}/kafka-streams directory. Changing the location of state.dir may resolve the problem", this.checkpointFile, e);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public Task.TaskType taskType() {
        return Task.TaskType.GLOBAL;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public Map<TopicPartition, Long> changelogOffsets() {
        return Collections.unmodifiableMap(this.checkpointFileCache);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public String changelogFor(String str) {
        return this.storeToChangelogTopic.get(str);
    }
}
