package io.streamthoughts.azkarra.api.streams;

import io.streamthoughts.azkarra.api.StreamsLifecycleContext;
import io.streamthoughts.azkarra.api.StreamsLifecycleInterceptor;
import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.errors.AzkarraException;
import io.streamthoughts.azkarra.api.events.EventStream;
import io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher;
import io.streamthoughts.azkarra.api.events.reactive.EventStreamPublisher;
import io.streamthoughts.azkarra.api.model.TimestampedValue;
import io.streamthoughts.azkarra.api.monad.Try;
import io.streamthoughts.azkarra.api.query.LocalStoreAccessor;
import io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer;
import io.streamthoughts.azkarra.api.streams.State;
import io.streamthoughts.azkarra.api.streams.consumer.ConsumerClientOffsets;
import io.streamthoughts.azkarra.api.streams.consumer.ConsumerGroupOffsets;
import io.streamthoughts.azkarra.api.streams.consumer.GlobalConsumerOffsetsRegistry;
import io.streamthoughts.azkarra.api.streams.consumer.LogOffsetsFetcher;
import io.streamthoughts.azkarra.api.streams.internal.InternalStreamsLifeCycleChain;
import io.streamthoughts.azkarra.api.streams.store.LocalStorePartitionLags;
import io.streamthoughts.azkarra.api.streams.store.PartitionLogOffsetsAndLag;
import io.streamthoughts.azkarra.api.streams.topology.TopologyDefinition;
import io.streamthoughts.azkarra.api.streams.topology.TopologyMetadata;
import io.streamthoughts.azkarra.api.time.Time;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/api/streams/DefaultKafkaStreamsContainer.class */
public class DefaultKafkaStreamsContainer implements KafkaStreamsContainer {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaStreamsContainer.class);
    private final KafkaStreamsFactory streamsFactory;
    private KafkaStreams kafkaStreams;
    private final Conf streamsConfig;
    private volatile Throwable lastObservedException;
    private volatile TimestampedValue<State> state;
    private final TopologyDefinition topologyDefinition;
    private final String applicationServer;
    private final List<StreamsLifecycleInterceptor> interceptors;
    private KafkaConsumer<byte[], byte[]> consumer;
    private AdminClient adminClient;
    private volatile ContainerState containerState;
    private Executor executor;
    private final UUID containerId;
    private long started = -1;
    private volatile Set<ThreadMetadata> threadMetadata = Collections.emptySet();
    private final LinkedBlockingQueue<KafkaStreamsContainer.StateChangeWatcher> stateChangeWatchers = new LinkedBlockingQueue<>();
    private final Object containerStateLock = new Object();
    private final List<EventStream> eventStreams = new LinkedList();
    private final Map<String, EventStreamPublisher> publishers = new LinkedHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamthoughts/azkarra/api/streams/DefaultKafkaStreamsContainer$ContainerState.class */
    public enum ContainerState {
        CREATED(1, 3),
        STARTING(2),
        STARTED(3),
        PENDING_SHUTDOWN(4),
        STOPPED(1, 3);

        private final Set<Integer> validTransitions = new HashSet();

        ContainerState(Integer... numArr) {
            this.validTransitions.addAll(Arrays.asList(numArr));
        }

        public boolean isValidTransition(ContainerState containerState) {
            return this.validTransitions.contains(Integer.valueOf(containerState.ordinal()));
        }
    }

    public static KafkaStreamsContainerBuilder newBuilder() {
        return new KafkaStreamsContainerBuilder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultKafkaStreamsContainer(Conf conf, TopologyDefinition topologyDefinition, KafkaStreamsFactory kafkaStreamsFactory, List<StreamsLifecycleInterceptor> list) {
        Objects.requireNonNull(topologyDefinition, "topologyDefinition cannot be null");
        Objects.requireNonNull(kafkaStreamsFactory, "streamsFactory cannot be null");
        this.streamsConfig = (Conf) Objects.requireNonNull(conf, "streamConfigs cannot be null");
        this.containerState = ContainerState.CREATED;
        setState(State.Standards.NOT_CREATED);
        this.interceptors = list;
        this.streamsFactory = kafkaStreamsFactory;
        this.topologyDefinition = topologyDefinition;
        this.containerId = UUID.randomUUID();
        this.applicationServer = streamsConfig().getOptionalString("application.server").orElse(null);
        topologyDefinition.getEventStreams().forEach(this::registerEventStream);
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public synchronized Future<State> start(Executor executor) {
        LOG.info("Starting KafkaStreams container for name='{}', version='{}', id='{}'.", new Object[]{this.topologyDefinition.getName(), this.topologyDefinition.getVersion(), applicationId()});
        LOG.info("StreamsLifecycleInterceptors : {}", this.interceptors.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.joining("\n\t", "\n\t", "")));
        setContainerState(ContainerState.STARTING);
        this.started = Time.SYSTEM.milliseconds();
        reset();
        this.executor = executor;
        this.stateChangeWatchers.clear();
        this.kafkaStreams = this.streamsFactory.make(this.topologyDefinition.getTopology(), this.streamsConfig);
        for (EventStream eventStream : this.eventStreams) {
            String type = eventStream.type();
            if (this.publishers.put(type, new AsyncMulticastEventStreamPublisher(eventStream)) != null) {
                throw new AzkarraException("Cannot register two event-streams for type: " + type);
            }
        }
        setState(State.Standards.CREATED);
        return CompletableFuture.supplyAsync(() -> {
            LOG.info("Executing stream-lifecycle interceptor chain (id={})", applicationId());
            new InternalStreamsLifeCycleChain(this.interceptors.iterator(), (streamsLifecycleInterceptor, streamsLifecycleChain) -> {
                streamsLifecycleInterceptor.onStart(newStreamsLifecycleContext(), streamsLifecycleChain);
            }, () -> {
                try {
                    LOG.info("Starting KafkaStreams (id={})", applicationId());
                    this.kafkaStreams.start();
                } catch (StreamsException e) {
                    this.lastObservedException = e;
                    throw e;
                }
            }).execute();
            LOG.info("Completed KafkaStreamsContainer initialization (id={}, state={})", applicationId(), this.kafkaStreams.state());
            setContainerState(ContainerState.STARTED);
            return state().value();
        }, executor);
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public EventStreamPublisher eventStreamPublisherForType(String str) {
        Objects.requireNonNull(str, "eventType cannot be null");
        validateInitialized();
        if (this.publishers.containsKey(str)) {
            return this.publishers.get(str);
        }
        throw new IllegalArgumentException("Cannot found Event-Stream for type: " + str);
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public <K, V> void registerEventStream(EventStream<K, V> eventStream) {
        this.eventStreams.add((EventStream) Objects.requireNonNull(eventStream, "eventStream cannot be null"));
    }

    private void reset() {
        this.lastObservedException = null;
        this.publishers.clear();
    }

    private void setState(State state) {
        this.state = new TimestampedValue<>(state);
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public TimestampedValue<State> state() {
        return this.state;
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public Optional<Serde> defaultKeySerde() {
        return !streamsConfig().hasPath("default.key.serde") ? Optional.empty() : Try.failable(() -> {
            return (Serde) streamsConfig().getClass("default.key.serde", Serde.class);
        }).toOptional();
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public Set<ThreadMetadata> threadMetadata() {
        return this.threadMetadata;
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public long startedSince() {
        return this.started;
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public Conf streamsConfig() {
        return this.streamsConfig;
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public String applicationId() {
        return streamsConfig().getString("application.id");
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public String applicationServer() {
        return this.applicationServer;
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public Optional<Throwable> exception() {
        return Optional.ofNullable(this.lastObservedException);
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public TopologyMetadata topologyMetadata() {
        return new TopologyMetadata(this.topologyDefinition.getName(), this.topologyDefinition.getVersion(), this.topologyDefinition.getDescription());
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public TopologyDescription topologyDescription() {
        return this.topologyDefinition.getTopology().describe();
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public Map<MetricName, ? extends Metric> metrics() {
        return initialized() ? this.kafkaStreams.metrics() : Collections.emptyMap();
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public ConsumerGroupOffsets offsets() {
        if (!isRunning()) {
            return new ConsumerGroupOffsets(applicationId(), Collections.emptySet());
        }
        ConsumerGroupOffsets snapshot = GlobalConsumerOffsetsRegistry.getInstance().offsetsFor(applicationId()).snapshot();
        Set set = (Set) threadMetadata().stream().flatMap(threadMetadata -> {
            return threadMetadata.activeTasks().stream();
        }).flatMap(taskMetadata -> {
            return taskMetadata.topicPartitions().stream();
        }).collect(Collectors.toSet());
        Map<TopicPartition, Long> fetchLogEndOffsetsFor = LogOffsetsFetcher.fetchLogEndOffsetsFor(getConsumer(), set);
        Map<TopicPartition, Long> fetchLogStartOffsetsFor = LogOffsetsFetcher.fetchLogStartOffsetsFor(getConsumer(), set);
        return new ConsumerGroupOffsets(snapshot.group(), (Set) snapshot.consumers().stream().map(consumerClientOffsets -> {
            return new ConsumerClientOffsets(consumerClientOffsets.clientId(), consumerClientOffsets.streamThread(), (Set) consumerClientOffsets.positions().stream().map(consumerLogOffsets -> {
                if (set.contains(consumerLogOffsets.topicPartition())) {
                    return consumerLogOffsets.logEndOffset((Long) fetchLogEndOffsetsFor.get(consumerLogOffsets.topicPartition())).logStartOffset((Long) fetchLogStartOffsetsFor.get(consumerLogOffsets.topicPartition()));
                }
                return null;
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toSet()));
        }).collect(Collectors.toSet()));
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public Producer<byte[], byte[]> createNewProducer(Map<String, Object> map) {
        String str = (String) map.get("client.id");
        if (str == null) {
            str = streamsConfig().getOptionalString("client.id").orElse(applicationId()) + "-" + this.containerId + "-producer";
        }
        Map<String, Object> producerConfigs = getProducerConfigs(this.streamsConfig.getConfAsMap());
        producerConfigs.putAll(map);
        producerConfigs.put("bootstrap.servers", this.streamsConfig.getString("bootstrap.servers"));
        producerConfigs.put("client.id", str);
        return new KafkaProducer(producerConfigs, new ByteArraySerializer(), new ByteArraySerializer());
    }

    private synchronized Consumer<byte[], byte[]> getConsumer() {
        if (this.consumer == null) {
            Map<String, Object> consumerConfigs = getConsumerConfigs(this.streamsConfig.getConfAsMap());
            String str = streamsConfig().getOptionalString("client.id").orElse(applicationId()) + "-" + this.containerId + "-consumer";
            consumerConfigs.put("bootstrap.servers", this.streamsConfig.getString("bootstrap.servers"));
            consumerConfigs.put("client.id", str);
            consumerConfigs.remove("group.id");
            this.consumer = new KafkaConsumer<>(consumerConfigs, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        }
        return this.consumer;
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public synchronized AdminClient getAdminClient() {
        if (this.adminClient == null) {
            Map<String, Object> adminClientConfigs = getAdminClientConfigs(this.streamsConfig.getConfAsMap());
            String str = this.streamsConfig.getOptionalString("client.id").orElse(applicationId()) + "-" + this.containerId + "-admin";
            adminClientConfigs.put("bootstrap.servers", this.streamsConfig.getString("bootstrap.servers"));
            adminClientConfigs.put("client.id", str);
            this.adminClient = AdminClient.create(adminClientConfigs);
        }
        return this.adminClient;
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public void close(Duration duration) {
        close(false, duration);
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public void close(boolean z, Duration duration) {
        closeAndOptionallyRestart(z, duration, false);
    }

    private void closeAndOptionallyRestart(boolean z, Duration duration, boolean z2) {
        validateInitialized();
        boolean z3 = true;
        synchronized (this.containerStateLock) {
            if (!setContainerState(ContainerState.PENDING_SHUTDOWN)) {
                LOG.warn("KafkaStreamsContainer is already in the pending shutdown state, wait to complete shutdown (id={})", applicationId());
                z3 = false;
            }
        }
        if (z3) {
            if (z2) {
                this.stateChangeWatchers.add(new KafkaStreamsContainer.StateChangeWatcher() { // from class: io.streamthoughts.azkarra.api.streams.DefaultKafkaStreamsContainer.1
                    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer.StateChangeWatcher
                    public boolean accept(State state) {
                        return state == State.Standards.STOPPED;
                    }

                    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer.StateChangeWatcher
                    public void onChange(StateChangeEvent stateChangeEvent) {
                        DefaultKafkaStreamsContainer.this.restartNow();
                    }
                });
            }
            if (z) {
                reset();
            }
            Thread thread = new Thread(() -> {
                LOG.info("Closing KafkaStreamsContainer (id={})", applicationId());
                new InternalStreamsLifeCycleChain(this.interceptors.iterator(), (streamsLifecycleInterceptor, streamsLifecycleChain) -> {
                    streamsLifecycleInterceptor.onStop(newStreamsLifecycleContext(), streamsLifecycleChain);
                }, () -> {
                    this.kafkaStreams.close();
                    if (z) {
                        LOG.info("Cleanup local states (id={})", applicationId());
                        this.kafkaStreams.cleanUp();
                    }
                    LOG.info("KafkaStreams closed completely (id={})", applicationId());
                }).execute();
                closeInternals();
                LOG.info("KafkaStreamsContainer has been closed (id={})", applicationId());
                setContainerState(ContainerState.STOPPED);
                stateChanges(new StateChangeEvent(State.Standards.STOPPED, State.Standards.valueOf(this.kafkaStreams.state().name())));
                this.eventStreams.forEach((v0) -> {
                    v0.close();
                });
            }, "kafka-streams-container-close-thread");
            thread.setDaemon(true);
            thread.start();
        }
        long millis = duration.toMillis();
        if (millis <= 0 || waitUntilContainerIsStopped(millis)) {
            return;
        }
        LOG.debug("KafkaStreamsContainer cannot transit to {} within {}ms (id={})", new Object[]{ContainerState.STOPPED, Long.valueOf(millis), applicationId()});
    }

    private StreamsLifecycleContext newStreamsLifecycleContext() {
        return new StreamsLifecycleContext() { // from class: io.streamthoughts.azkarra.api.streams.DefaultKafkaStreamsContainer.2
            @Override // io.streamthoughts.azkarra.api.StreamsLifecycleContext
            public void setState(State state) {
                DefaultKafkaStreamsContainer.this.setState(state);
            }

            @Override // io.streamthoughts.azkarra.api.StreamsLifecycleContext
            public KafkaStreamsContainer container() {
                return DefaultKafkaStreamsContainer.this;
            }
        };
    }

    private boolean setContainerState(ContainerState containerState) {
        synchronized (this.containerStateLock) {
            if (this.containerState.isValidTransition(containerState)) {
                this.containerState = containerState;
                this.containerStateLock.notifyAll();
                return true;
            }
            if (this.containerState == ContainerState.PENDING_SHUTDOWN && containerState == ContainerState.PENDING_SHUTDOWN) {
                return false;
            }
            throw new IllegalStateException("KafkaStreamsContainer " + applicationId() + ": Unexpected state transition from " + this.containerState + " to " + containerState);
        }
    }

    private boolean waitUntilContainerIsStopped(long j) {
        long milliseconds = Time.SYSTEM.milliseconds();
        synchronized (this.containerStateLock) {
            long j2 = 0;
            while (this.containerState != ContainerState.STOPPED) {
                if (j <= j2) {
                    return false;
                }
                try {
                    this.containerStateLock.wait(j - j2);
                } catch (InterruptedException e) {
                }
                j2 = Time.SYSTEM.milliseconds() - milliseconds;
            }
            return true;
        }
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public void restart() {
        if (this.containerState.isValidTransition(ContainerState.STARTING)) {
            restartNow();
        } else {
            closeAndOptionallyRestart(false, Duration.ZERO, true);
        }
    }

    private void restartNow() {
        ((CompletableFuture) start(this.executor)).handle((state, th) -> {
            if (th != null) {
                LOG.error("Unexpected error happens while restarting streams", th);
            }
            return state;
        });
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public List<LocalStorePartitionLags> allLocalStorePartitionLags() {
        return (List) this.kafkaStreams.allLocalStorePartitionLags().entrySet().stream().map(entry -> {
            String str = (String) entry.getKey();
            ArrayList arrayList = new ArrayList(((Map) entry.getValue()).size());
            ((Map) entry.getValue()).forEach((num, lagInfo) -> {
                arrayList.add(new PartitionLogOffsetsAndLag(num.intValue(), lagInfo.currentOffsetPosition(), lagInfo.endOffsetPosition(), lagInfo.offsetLag()));
            });
            return new LocalStorePartitionLags(str, arrayList);
        }).collect(Collectors.toList());
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public Optional<ServerMetadata> localServerMetadata() {
        return allMetadata().stream().filter((v0) -> {
            return v0.isLocal();
        }).findFirst();
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public Set<ServerMetadata> allMetadata() {
        return !isRunning() ? Collections.emptySet() : (Set) this.kafkaStreams.allMetadata().stream().map(this::newServerInfoFor).collect(Collectors.toSet());
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public Collection<ServerMetadata> allMetadataForStore(String str) {
        Objects.requireNonNull(str, "storeName cannot be null");
        return !isRunning() ? Collections.emptySet() : (Collection) this.kafkaStreams.allMetadataForStore(str).stream().map(this::newServerInfoFor).collect(Collectors.toList());
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public <K> Optional<KeyQueryMetadata> findMetadataForStoreAndKey(String str, K k, Serializer<K> serializer) {
        Objects.requireNonNull(str, "storeName cannot be null");
        Objects.requireNonNull(k, "key cannot be null");
        Objects.requireNonNull(serializer, "keySerializer cannot be null");
        if (!initialized()) {
            return Optional.empty();
        }
        KeyQueryMetadata queryMetadataForKey = this.kafkaStreams.queryMetadataForKey(str, k, serializer);
        return (queryMetadataForKey == null || queryMetadataForKey.equals(KeyQueryMetadata.NOT_AVAILABLE)) ? Optional.empty() : Optional.of(queryMetadataForKey);
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public <K, V> LocalStoreAccessor<ReadOnlyKeyValueStore<K, V>> localKeyValueStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.keyValueStore());
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public <K, V> LocalStoreAccessor<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> localTimestampedKeyValueStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.timestampedKeyValueStore());
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public <K, V> LocalStoreAccessor<ReadOnlyWindowStore<K, V>> localWindowStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.windowStore());
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public <K, V> LocalStoreAccessor<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> localTimestampedWindowStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.timestampedWindowStore());
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public <K, V> LocalStoreAccessor<ReadOnlySessionStore<K, V>> localSessionStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.sessionStore());
    }

    private <T> LocalStoreAccessor<T> getLocalStoreAccess(String str, QueryableStoreType<T> queryableStoreType) {
        return new LocalStoreAccessor<>(() -> {
            return this.kafkaStreams.store(StoreQueryParameters.fromNameAndType(str, queryableStoreType));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Logger logger() {
        return LOG;
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public boolean isRunning() {
        if (!initialized()) {
            return false;
        }
        KafkaStreams.State state = this.kafkaStreams.state();
        return state.equals(KafkaStreams.State.RUNNING) || state.equals(KafkaStreams.State.REBALANCING);
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public void addStateChangeWatcher(KafkaStreamsContainer.StateChangeWatcher stateChangeWatcher) {
        this.stateChangeWatchers.add((KafkaStreamsContainer.StateChangeWatcher) Objects.requireNonNull(stateChangeWatcher, "Cannot register null watcher"));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stateChanges(StateChangeEvent stateChangeEvent) {
        this.state = new TimestampedValue<>(stateChangeEvent.timestamp(), stateChangeEvent.newState());
        if (this.state.value() == State.Standards.RUNNING) {
            this.threadMetadata = this.kafkaStreams.localThreadsMetadata();
        } else {
            this.threadMetadata = Collections.emptySet();
        }
        if (this.stateChangeWatchers.isEmpty()) {
            return;
        }
        ArrayList<KafkaStreamsContainer.StateChangeWatcher> arrayList = new ArrayList(this.stateChangeWatchers.size());
        this.stateChangeWatchers.drainTo(arrayList);
        for (KafkaStreamsContainer.StateChangeWatcher stateChangeWatcher : arrayList) {
            if (stateChangeWatcher.accept(stateChangeEvent.newState())) {
                stateChangeWatcher.onChange(stateChangeEvent);
            } else {
                this.stateChangeWatchers.add(stateChangeWatcher);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setException(Throwable th) {
        this.lastObservedException = th;
    }

    private ServerMetadata newServerInfoFor(StreamsMetadata streamsMetadata) {
        return new ServerMetadata(new ServerHostInfo(applicationId(), streamsMetadata.host(), streamsMetadata.port(), isLocal(streamsMetadata)), streamsMetadata.stateStoreNames(), groupByTopicThenGet(streamsMetadata.topicPartitions()), streamsMetadata.standbyStateStoreNames(), groupByTopicThenGet(streamsMetadata.standbyTopicPartitions()));
    }

    private boolean isLocal(StreamsMetadata streamsMetadata) {
        return isSameHost(new HostInfo(streamsMetadata.host(), streamsMetadata.port()));
    }

    private static Set<TopicPartitions> groupByTopicThenGet(Set<TopicPartition> set) {
        return (Set) ((Map) set.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.topic();
        }))).entrySet().stream().map(entry -> {
            return new TopicPartitions((String) entry.getKey(), (Set) ((List) entry.getValue()).stream().map((v0) -> {
                return v0.partition();
            }).collect(Collectors.toSet()));
        }).collect(Collectors.toSet());
    }

    private static Map<String, Object> getAdminClientConfigs(Map<String, Object> map) {
        return getConfigsForKeys(map, AdminClientConfig.configNames());
    }

    private static Map<String, Object> getConsumerConfigs(Map<String, Object> map) {
        return getConfigsForKeys(map, ConsumerConfig.configNames());
    }

    private static Map<String, Object> getProducerConfigs(Map<String, Object> map) {
        return getConfigsForKeys(map, ProducerConfig.configNames());
    }

    private static Map<String, Object> getConfigsForKeys(Map<String, Object> map, Set<String> set) {
        HashMap hashMap = new HashMap();
        for (String str : set) {
            if (map.containsKey(str)) {
                hashMap.put(str, map.get(str));
            }
        }
        return hashMap;
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public KafkaStreams kafkaStreams() {
        validateInitialized();
        return this.kafkaStreams;
    }

    @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
    public boolean isSameHost(HostInfo hostInfo) {
        return this.applicationServer.equals(hostInfo.host() + ":" + hostInfo.port());
    }

    private void validateInitialized() {
        if (!initialized()) {
            throw new IllegalStateException("This container is not started. Cannot get access to KafkaStreams instance.");
        }
    }

    private void closeInternals() {
        LOG.info("Closing internal clients for Kafka Streams container (application.id={})", applicationId());
        try {
            if (this.consumer != null) {
                this.consumer.close();
            }
            if (this.adminClient != null) {
                this.adminClient.close();
            }
        } catch (Exception e) {
            LOG.error("Unexpected error occurred while closing internal resources", e);
        } finally {
            this.consumer = null;
            this.adminClient = null;
        }
    }

    private boolean initialized() {
        return this.kafkaStreams != null;
    }
}
