package dev.responsive.kafka.api;

import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistration;
import dev.responsive.kafka.api.async.internals.AsyncUtils;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.internal.clients.TTDCassandraClient;
import dev.responsive.kafka.internal.clients.TTDMockAdmin;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.TTDRestoreListener;
import dev.responsive.kafka.internal.utils.SessionClients;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.TTDUtils;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.test.TestRecord;

/* loaded from: input_file:dev/responsive/kafka/api/ResponsiveTopologyTestDriver.class */
public class ResponsiveTopologyTestDriver extends TTDUtils.TopologyTestDriverAccessor {
    public static final String RESPONSIVE_TTD_ORG = "Responsive";
    public static final String RESPONSIVE_TTD_ENV = "TopologyTestDriver";
    private final TTDCassandraClient client;
    private final Optional<AsyncThreadPoolRegistration> asyncThreadPool;

    public ResponsiveTopologyTestDriver(Topology topology) {
        this(topology, new Properties());
    }

    public ResponsiveTopologyTestDriver(Topology topology, Properties properties) {
        this(topology, properties, null);
    }

    public ResponsiveTopologyTestDriver(Topology topology, Instant instant) {
        this(topology, new Properties(), instant);
    }

    public ResponsiveTopologyTestDriver(Topology topology, Properties properties, Instant instant) {
        this(topology, baseProps(properties), instant, new TTDCassandraClient(new TTDMockAdmin(baseProps(properties), topology), mockTime(instant)));
    }

    public void advanceWallClockTime(Duration duration) {
        this.client.advanceWallClockTime(duration);
        this.client.flush();
        super.advanceWallClockTime(duration);
    }

    public void flush() {
        this.asyncThreadPool.ifPresent((v0) -> {
            v0.flushAllAsyncEvents();
        });
        this.client.flush();
        super.advanceWallClockTime(Duration.ZERO);
    }

    private ResponsiveTopologyTestDriver(Topology topology, Properties properties, Instant instant, TTDCassandraClient tTDCassandraClient) {
        super(topology, testDriverProps(properties, topology.describe(), tTDCassandraClient), instant);
        this.client = tTDCassandraClient;
        this.asyncThreadPool = getAsyncThreadPoolRegistration(super.props());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.streams.TTDUtils.TopologyTestDriverAccessor
    public <K, V> void pipeRecord(String str, TestRecord<K, V> testRecord, Serializer<K> serializer, Serializer<V> serializer2, Instant instant) {
        super.pipeRecord(str, testRecord, serializer, serializer2, instant);
        flush();
    }

    private static Properties testDriverProps(Properties properties, TopologyDescription topologyDescription, TTDCassandraClient tTDCassandraClient) {
        SessionClients sessionClients = new SessionClients(Optional.empty(), Optional.of(tTDCassandraClient), Optional.empty(), StorageBackend.CASSANDRA, tTDCassandraClient.mockAdmin());
        TTDRestoreListener mockRestoreListener = TTDRestoreListener.mockRestoreListener(properties);
        sessionClients.initialize(mockRestoreListener.metrics(), mockRestoreListener);
        ResponsiveMetrics responsiveMetrics = new ResponsiveMetrics(new Metrics());
        String property = properties.getProperty("application.id");
        responsiveMetrics.initializeTags(property, property + "-client", ClientVersionMetadata.loadVersionMetadata(), Collections.emptyMap());
        InternalSessionConfigs.Builder withTopologyDescription = new InternalSessionConfigs.Builder().withSessionClients(sessionClients).withStoreRegistry(tTDCassandraClient.storeRegistry()).withMetrics(responsiveMetrics).withTopologyDescription(topologyDescription);
        AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(properties), 1, responsiveMetrics).ifPresent(asyncThreadPoolRegistry -> {
            asyncThreadPoolRegistry.startNewAsyncThreadPool(Thread.currentThread().getName());
            withTopologyDescription.withAsyncThreadPoolRegistry(asyncThreadPoolRegistry);
        });
        properties.putAll(withTopologyDescription.build());
        return properties;
    }

    private static Properties baseProps(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("responsive.org", RESPONSIVE_TTD_ORG);
        properties2.put("responsive.env", RESPONSIVE_TTD_ENV);
        properties2.put("responsive.store.flush.trigger.local.interval.ms", 0);
        properties2.putIfAbsent("cache.max.bytes.buffering", 0);
        properties2.putIfAbsent("application.id", "Responsive-TopologyTestDriver");
        return properties2;
    }

    private static MockTime mockTime(Instant instant) {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        mockTime.setCurrentTimeMs(instant == null ? System.currentTimeMillis() : instant.toEpochMilli());
        return mockTime;
    }

    private static Optional<AsyncThreadPoolRegistration> getAsyncThreadPoolRegistration(Properties properties) {
        if (ResponsiveConfig.responsiveConfig(properties).getInt("responsive.async.thread.pool.size").intValue() <= 0) {
            return Optional.empty();
        }
        HashMap hashMap = new HashMap();
        properties.forEach((obj, obj2) -> {
            hashMap.put(obj.toString(), obj2);
        });
        return Optional.of(AsyncUtils.getAsyncThreadPool(hashMap, Thread.currentThread().getName()));
    }
}
