package org.creekservice.internal.kafka.streams.extension.observation;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.creekservice.api.kafka.streams.extension.observation.LifecycleObserver;
import org.creekservice.api.observability.lifecycle.BasicLifecycle;
import org.creekservice.api.observability.lifecycle.LifecycleLogging;
import org.creekservice.api.observability.lifecycle.LoggableLifecycle;
import org.creekservice.api.observability.logging.structured.StructuredLogger;
import org.creekservice.api.observability.logging.structured.StructuredLoggerFactory;

/* loaded from: input_file:org/creekservice/internal/kafka/streams/extension/observation/DefaultLifecycleObserver.class */
public final class DefaultLifecycleObserver implements LifecycleObserver {
    private static final StructuredLogger LOGGER = StructuredLoggerFactory.internalLogger(DefaultLifecycleObserver.class);
    private final StructuredLogger logger;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/creekservice/internal/kafka/streams/extension/observation/DefaultLifecycleObserver$Lifecycle.class */
    public enum Lifecycle implements LoggableLifecycle {
        starting(BasicLifecycle.starting),
        started(BasicLifecycle.started),
        stopping(BasicLifecycle.stopping),
        stopped(BasicLifecycle.stopped),
        rebalancing(null),
        running(null);

        private final Optional<BasicLifecycle> basic;

        Lifecycle(BasicLifecycle basicLifecycle) {
            this.basic = Optional.ofNullable(basicLifecycle);
        }

        public String logMessage(String str) {
            return (String) this.basic.map(basicLifecycle -> {
                return basicLifecycle.logMessage(str);
            }).orElseGet(() -> {
                return LifecycleLogging.lifecycleLogMessage(str, this);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/creekservice/internal/kafka/streams/extension/observation/DefaultLifecycleObserver$Metric.class */
    public enum Metric {
        lifecycle,
        exitCode,
        closeTimeout
    }

    public DefaultLifecycleObserver() {
        this(LOGGER);
    }

    DefaultLifecycleObserver(StructuredLogger structuredLogger) {
        this.logger = (StructuredLogger) Objects.requireNonNull(structuredLogger, "logger");
    }

    @Override // org.creekservice.api.kafka.streams.extension.observation.LifecycleObserver
    public void starting() {
        log(Lifecycle.starting, Map.of());
    }

    @Override // org.creekservice.api.kafka.streams.extension.observation.LifecycleObserver
    public void rebalancing() {
        log(Lifecycle.rebalancing, Map.of());
    }

    @Override // org.creekservice.api.kafka.streams.extension.observation.LifecycleObserver
    public void running() {
        log(Lifecycle.running, Map.of());
    }

    @Override // org.creekservice.api.kafka.streams.extension.observation.LifecycleObserver
    public void started() {
        log(Lifecycle.started, Map.of());
    }

    @Override // org.creekservice.api.kafka.streams.extension.observation.LifecycleObserver
    public void failed(Throwable th) {
        this.logger.error("Failure of streams app detected.", logEntryCustomizer -> {
            logEntryCustomizer.withThrowable(th);
        });
    }

    @Override // org.creekservice.api.kafka.streams.extension.observation.LifecycleObserver
    public void stopping(LifecycleObserver.ExitCode exitCode) {
        log(Lifecycle.stopping, Map.of(Metric.exitCode, exitCode));
    }

    @Override // org.creekservice.api.kafka.streams.extension.observation.LifecycleObserver
    public void stopTimedOut(Duration duration) {
        this.logger.warn("Failed to stop the Kafka Streams app within the configured timeout, i.e. streams.close() returned false", logEntryCustomizer -> {
            logEntryCustomizer.with(Metric.closeTimeout, duration);
        });
    }

    @Override // org.creekservice.api.kafka.streams.extension.observation.LifecycleObserver
    public void stopFailed(Throwable th) {
        this.logger.error("Failed to stop the Kafka Streams app as streams.close() threw an exception", logEntryCustomizer -> {
            logEntryCustomizer.withThrowable(th);
        });
    }

    @Override // org.creekservice.api.kafka.streams.extension.observation.LifecycleObserver
    public void stopped(LifecycleObserver.ExitCode exitCode) {
        log(Lifecycle.stopped, Map.of(Metric.exitCode, exitCode));
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return obj != null && getClass() == obj.getClass();
    }

    public int hashCode() {
        return getClass().hashCode();
    }

    public String toString() {
        return getClass().getSimpleName();
    }

    private void log(Lifecycle lifecycle, Map<Metric, ?> map) {
        this.logger.info("Kafka streams app state change", logEntryCustomizer -> {
            logEntryCustomizer.with(Metric.lifecycle, lifecycle.logMessage("service"));
            Objects.requireNonNull(logEntryCustomizer);
            map.forEach((v1, v2) -> {
                r1.with(v1, v2);
            });
        });
    }
}
