package org.creekservice.internal.kafka.streams.extension;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.streams.KafkaStreams;
import org.creekservice.api.kafka.streams.extension.KafkaStreamsExtensionOptions;
import org.creekservice.api.kafka.streams.extension.observation.KafkaMetricsPublisherOptions;
import org.creekservice.api.kafka.streams.extension.observation.LifecycleObserver;
import org.creekservice.internal.kafka.streams.extension.observation.DefaultMetricsPublisher;
import org.creekservice.internal.kafka.streams.extension.observation.MetricsPublisher;

/* loaded from: input_file:org/creekservice/internal/kafka/streams/extension/KafkaStreamsExecutor.class */
public final class KafkaStreamsExecutor {
    private static final Duration LOGGING_STREAMS_CLOSE_DELAY = Duration.ofSeconds(1);
    private final KafkaStreamsExtensionOptions options;
    private final MetricsPublisherFactory metricsPublisherFactory;
    private final StreamsShutdownHook shutdownHook;
    private final ShutdownMethod shutdownMethod;
    private final CompletableFuture<Void> shutdownFuture;
    private final Runnable loggingCloseDelay;

    /* loaded from: input_file:org/creekservice/internal/kafka/streams/extension/KafkaStreamsExecutor$MetricsPublisherFactory.class */
    interface MetricsPublisherFactory {
        MetricsPublisher create(KafkaMetricsPublisherOptions kafkaMetricsPublisherOptions);
    }

    /* loaded from: input_file:org/creekservice/internal/kafka/streams/extension/KafkaStreamsExecutor$ShutdownMethod.class */
    interface ShutdownMethod {
        void shutdownApp(int i);
    }

    public KafkaStreamsExecutor(KafkaStreamsExtensionOptions kafkaStreamsExtensionOptions) {
        this(kafkaStreamsExtensionOptions, DefaultMetricsPublisher::new, new DefaultStreamsShutdownHook(), System::exit, new CompletableFuture(), KafkaStreamsExecutor::giveStreamsTimeToLogCauseAsLoggingIsDoneAfterStateChangeCallbackIsCalled);
    }

    KafkaStreamsExecutor(KafkaStreamsExtensionOptions kafkaStreamsExtensionOptions, MetricsPublisherFactory metricsPublisherFactory, StreamsShutdownHook streamsShutdownHook, ShutdownMethod shutdownMethod, CompletableFuture<Void> completableFuture, Runnable runnable) {
        this.options = (KafkaStreamsExtensionOptions) Objects.requireNonNull(kafkaStreamsExtensionOptions, "options");
        this.metricsPublisherFactory = (MetricsPublisherFactory) Objects.requireNonNull(metricsPublisherFactory, "metricsPublisherSupplier");
        this.shutdownHook = (StreamsShutdownHook) Objects.requireNonNull(streamsShutdownHook, "shutdownHook");
        this.shutdownMethod = (ShutdownMethod) Objects.requireNonNull(shutdownMethod, "shutdownHandler");
        this.shutdownFuture = (CompletableFuture) Objects.requireNonNull(completableFuture, "future");
        this.loggingCloseDelay = (Runnable) Objects.requireNonNull(runnable, "loggingCloseDelay");
    }

    @SuppressFBWarnings(value = {"RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE"}, justification = "false-positive")
    public void execute(KafkaStreams kafkaStreams) {
        LifecycleObserver lifecycleObserver = this.options.lifecycleObserver();
        LifecycleObserver.ExitCode exitCode = LifecycleObserver.ExitCode.EXCEPTION_THROWN_STARTING;
        try {
            MetricsPublisher create = this.metricsPublisherFactory.create(this.options.metricsPublishing());
            try {
                lifecycleObserver.starting();
                Objects.requireNonNull(kafkaStreams);
                create.schedule(kafkaStreams::metrics);
                kafkaStreams.setStateListener(new StreamsStateListener(lifecycleObserver, this.shutdownFuture));
                kafkaStreams.setGlobalStateRestoreListener(new RestoreListener(this.options.restoreObserver()));
                this.shutdownHook.apply(kafkaStreams, this.shutdownFuture);
                kafkaStreams.start();
                exitCode = shutdownStreams(kafkaStreams, waitForShutdown(lifecycleObserver), lifecycleObserver);
                if (create != null) {
                    create.close();
                }
                this.shutdownMethod.shutdownApp(exitCode.asInt());
            } finally {
            }
        } catch (Throwable th) {
            this.shutdownMethod.shutdownApp(exitCode.asInt());
            throw th;
        }
    }

    private LifecycleObserver.ExitCode waitForShutdown(LifecycleObserver lifecycleObserver) {
        try {
            this.shutdownFuture.get();
            return LifecycleObserver.ExitCode.OK;
        } catch (Exception e) {
            lifecycleObserver.failed(e instanceof ExecutionException ? e.getCause() : e);
            this.loggingCloseDelay.run();
            return LifecycleObserver.ExitCode.STREAM_APP_FAILED;
        }
    }

    private static void giveStreamsTimeToLogCauseAsLoggingIsDoneAfterStateChangeCallbackIsCalled() {
        try {
            Thread.sleep(LOGGING_STREAMS_CLOSE_DELAY.toMillis());
        } catch (InterruptedException e) {
        }
    }

    private LifecycleObserver.ExitCode shutdownStreams(KafkaStreams kafkaStreams, LifecycleObserver.ExitCode exitCode, LifecycleObserver lifecycleObserver) {
        try {
            Duration streamsCloseTimeout = this.options.streamsCloseTimeout();
            lifecycleObserver.stopping(exitCode);
            if (kafkaStreams.close(streamsCloseTimeout)) {
                lifecycleObserver.stopped(exitCode);
                return exitCode;
            }
            lifecycleObserver.stopTimedOut(streamsCloseTimeout);
            return LifecycleObserver.ExitCode.STREAMS_TIMED_OUT_CLOSING;
        } catch (Exception e) {
            lifecycleObserver.stopFailed(e);
            return LifecycleObserver.ExitCode.EXCEPTION_THROWN_STOPPING;
        }
    }
}
