/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.AbstractStateManager;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;

public class GlobalStateManagerImpl
extends AbstractStateManager
implements GlobalStateManager {
    private final Logger log;
    private final ProcessorTopology topology;
    private final Consumer<byte[], byte[]> globalConsumer;
    private final StateDirectory stateDirectory;
    private final Set<String> globalStoreNames = new HashSet<String>();
    private final StateRestoreListener stateRestoreListener;
    private InternalProcessorContext processorContext;
    private final int retries;
    private final long retryBackoffMs;
    private final Duration pollTime;
    private final Set<String> globalNonPersistentStoresTopics = new HashSet<String>();

    public GlobalStateManagerImpl(LogContext logContext, ProcessorTopology topology, Consumer<byte[], byte[]> globalConsumer, StateDirectory stateDirectory, StateRestoreListener stateRestoreListener, StreamsConfig config) {
        super(stateDirectory.globalStateDir(), "exactly_once".equals(config.getString("processing.guarantee")));
        Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic();
        for (StateStore store : topology.globalStateStores()) {
            if (store.persistent()) continue;
            this.globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
        }
        this.log = logContext.logger(GlobalStateManagerImpl.class);
        this.topology = topology;
        this.globalConsumer = globalConsumer;
        this.stateDirectory = stateDirectory;
        this.stateRestoreListener = stateRestoreListener;
        this.retries = config.getInt("retries");
        this.retryBackoffMs = config.getLong("retry.backoff.ms");
        this.pollTime = Duration.ofMillis(config.getLong("poll.ms"));
    }

    @Override
    public void setGlobalProcessorContext(InternalProcessorContext processorContext) {
        this.processorContext = processorContext;
    }

    @Override
    public Set<String> initialize() {
        try {
            if (!this.stateDirectory.lockGlobalState()) {
                throw new LockException(String.format("Failed to lock the global state directory: %s", this.baseDir));
            }
        }
        catch (IOException e) {
            throw new LockException(String.format("Failed to lock the global state directory: %s", this.baseDir));
        }
        try {
            this.checkpointableOffsets.putAll(this.checkpoint.read());
        }
        catch (IOException e) {
            try {
                this.stateDirectory.unlockGlobalState();
            }
            catch (IOException e1) {
                this.log.error("Failed to unlock the global state directory", e);
            }
            throw new StreamsException("Failed to read checkpoints for global state globalStores", e);
        }
        List<StateStore> stateStores = this.topology.globalStateStores();
        for (StateStore stateStore : stateStores) {
            this.globalStoreNames.add(stateStore.name());
            stateStore.init(this.processorContext, stateStore);
        }
        return Collections.unmodifiableSet(this.globalStoreNames);
    }

    @Override
    public void reinitializeStateStoresForPartitions(Collection<TopicPartition> partitions, InternalProcessorContext processorContext) {
        super.reinitializeStateStoresForPartitions(this.log, this.globalStores, this.topology.storeToChangelogTopic(), partitions, processorContext);
        this.globalConsumer.assign(partitions);
        this.globalConsumer.seekToBeginning(partitions);
    }

    @Override
    public StateStore getGlobalStore(String name) {
        return this.globalStores.getOrDefault(name, Optional.empty()).orElse(null);
    }

    @Override
    public StateStore getStore(String name) {
        return this.getGlobalStore(name);
    }

    @Override
    public File baseDir() {
        return this.baseDir;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
        if (this.globalStores.containsKey(store.name())) {
            throw new IllegalArgumentException(String.format("Global Store %s has already been registered", store.name()));
        }
        if (!this.globalStoreNames.contains(store.name())) {
            throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
        }
        if (stateRestoreCallback == null) {
            throw new IllegalArgumentException(String.format("The stateRestoreCallback provided for store %s was null", store.name()));
        }
        this.log.info("Restoring state for global store {}", (Object)store.name());
        List<TopicPartition> topicPartitions = this.topicPartitionsForStore(store);
        Map<TopicPartition, Long> highWatermarks = null;
        int attempts = 0;
        while (highWatermarks == null) {
            try {
                highWatermarks = this.globalConsumer.endOffsets(topicPartitions);
            }
            catch (TimeoutException retryableException) {
                if (++attempts > this.retries) {
                    this.log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. You can increase the number of retries via configuration parameter `retries`.", store.name(), this.retries, retryableException);
                    throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. You can increase the number of retries via configuration parameter `retries`.", store.name(), this.retries), retryableException);
                }
                this.log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})", topicPartitions, this.retryBackoffMs, attempts, this.retries, retryableException);
                Utils.sleep(this.retryBackoffMs);
            }
        }
        try {
            this.restoreState(stateRestoreCallback, topicPartitions, highWatermarks, store.name(), GlobalStateManagerImpl.converterForStore(store));
            this.globalStores.put(store.name(), Optional.of(store));
        }
        finally {
            this.globalConsumer.unsubscribe();
        }
    }

    private List<TopicPartition> topicPartitionsForStore(StateStore store) {
        List<PartitionInfo> partitionInfos;
        String sourceTopic = this.topology.storeToChangelogTopic().get(store.name());
        int attempts = 0;
        while (true) {
            try {
                partitionInfos = this.globalConsumer.partitionsFor(sourceTopic);
            }
            catch (TimeoutException retryableException) {
                if (++attempts > this.retries) {
                    this.log.error("Failed to get partitions for topic {} after {} retry attempts due to timeout. The broker may be transiently unavailable at the moment. You can increase the number of retries via configuration parameter `retries`.", sourceTopic, this.retries, retryableException);
                    throw new StreamsException(String.format("Failed to get partitions for topic %s after %d retry attempts due to timeout. The broker may be transiently unavailable at the moment. You can increase the number of retries via configuration parameter `retries`.", sourceTopic, this.retries), retryableException);
                }
                this.log.debug("Failed to get partitions for topic {} due to timeout. The broker may be transiently unavailable at the moment. Backing off for {} ms to retry (attempt {} of {})", sourceTopic, this.retryBackoffMs, attempts, this.retries, retryableException);
                Utils.sleep(this.retryBackoffMs);
                continue;
            }
            break;
        }
        if (partitionInfos == null || partitionInfos.isEmpty()) {
            throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name()));
        }
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        for (PartitionInfo partition : partitionInfos) {
            topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }
        return topicPartitions;
    }

    private void restoreState(StateRestoreCallback stateRestoreCallback, List<TopicPartition> topicPartitions, Map<TopicPartition, Long> highWatermarks, String storeName, RecordConverter recordConverter) {
        for (TopicPartition topicPartition : topicPartitions) {
            this.globalConsumer.assign(Collections.singletonList(topicPartition));
            Long checkpoint = (Long)this.checkpointableOffsets.get(topicPartition);
            if (checkpoint != null) {
                this.globalConsumer.seek(topicPartition, checkpoint);
            } else {
                this.globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
            }
            long offset = this.globalConsumer.position(topicPartition);
            Long highWatermark = highWatermarks.get(topicPartition);
            RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
            this.stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
            long restoreCount = 0L;
            while (offset < highWatermark) {
                try {
                    ConsumerRecords<byte[], byte[]> records = this.globalConsumer.poll(this.pollTime);
                    ArrayList<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>();
                    for (ConsumerRecord<byte[], byte[]> record : records.records(topicPartition)) {
                        if (record.key() == null) continue;
                        restoreRecords.add(recordConverter.convert(record));
                    }
                    offset = this.globalConsumer.position(topicPartition);
                    stateRestoreAdapter.restoreBatch(restoreRecords);
                    this.stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());
                    restoreCount += (long)restoreRecords.size();
                }
                catch (InvalidOffsetException recoverableException) {
                    this.log.warn("Restoring GlobalStore {} failed due to: {}. Deleting global store to recreate from scratch.", (Object)storeName, (Object)recoverableException.toString());
                    this.reinitializeStateStoresForPartitions(recoverableException.partitions(), this.processorContext);
                    this.stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
                    restoreCount = 0L;
                }
            }
            this.stateRestoreListener.onRestoreEnd(topicPartition, storeName, restoreCount);
            this.checkpointableOffsets.put(topicPartition, offset);
        }
    }

    @Override
    public void flush() {
        this.log.debug("Flushing all global globalStores registered in the state manager");
        for (Map.Entry entry : this.globalStores.entrySet()) {
            if (((Optional)entry.getValue()).isPresent()) {
                StateStore store = (StateStore)((Optional)entry.getValue()).get();
                try {
                    this.log.trace("Flushing global store={}", (Object)store.name());
                    store.flush();
                    continue;
                }
                catch (Exception e) {
                    throw new ProcessorStateException(String.format("Failed to flush global state store %s", store.name()), e);
                }
            }
            throw new IllegalStateException("Expected " + (String)entry.getKey() + " to have been initialized");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(boolean clean) throws IOException {
        try {
            if (this.globalStores.isEmpty()) {
                return;
            }
            StringBuilder closeFailed = new StringBuilder();
            for (Map.Entry entry : this.globalStores.entrySet()) {
                if (((Optional)entry.getValue()).isPresent()) {
                    this.log.debug("Closing global storage engine {}", entry.getKey());
                    try {
                        ((StateStore)((Optional)entry.getValue()).get()).close();
                    }
                    catch (Exception e) {
                        this.log.error("Failed to close global state store {}", entry.getKey(), (Object)e);
                        closeFailed.append("Failed to close global state store:").append((String)entry.getKey()).append(". Reason: ").append(e.toString()).append("\n");
                    }
                    this.globalStores.put(entry.getKey(), Optional.empty());
                    continue;
                }
                this.log.info("Skipping to close non-initialized store {}", entry.getKey());
            }
            if (closeFailed.length() > 0) {
                throw new ProcessorStateException("Exceptions caught during close of 1 or more global state globalStores\n" + closeFailed);
            }
        }
        finally {
            this.stateDirectory.unlockGlobalState();
        }
    }

    @Override
    public void checkpoint(Map<TopicPartition, Long> offsets) {
        this.checkpointableOffsets.putAll(offsets);
        HashMap<TopicPartition, Long> filteredOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry topicPartitionOffset : this.checkpointableOffsets.entrySet()) {
            String topic = ((TopicPartition)topicPartitionOffset.getKey()).topic();
            if (this.globalNonPersistentStoresTopics.contains(topic)) continue;
            filteredOffsets.put((TopicPartition)topicPartitionOffset.getKey(), (Long)topicPartitionOffset.getValue());
        }
        try {
            this.checkpoint.write(filteredOffsets);
        }
        catch (IOException e) {
            this.log.warn("Failed to write offset checkpoint file to {} for global stores: {}", (Object)this.checkpoint, (Object)e);
        }
    }

    @Override
    public Map<TopicPartition, Long> checkpointed() {
        return Collections.unmodifiableMap(this.checkpointableOffsets);
    }
}

