/*
 * Decompiled with CFR 0.152.
 */
package dev.responsive.kafka.api;

import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistration;
import dev.responsive.kafka.api.async.internals.AsyncUtils;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.clients.TTDCassandraClient;
import dev.responsive.kafka.internal.clients.TTDMockAdmin;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
import dev.responsive.kafka.internal.stores.TTDRestoreListener;
import dev.responsive.kafka.internal.utils.SessionClients;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.TTDUtils;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.test.TestRecord;

public class ResponsiveTopologyTestDriver
extends TTDUtils.TopologyTestDriverAccessor {
    public static final String RESPONSIVE_TTD_ORG = "Responsive";
    public static final String RESPONSIVE_TTD_ENV = "TopologyTestDriver";
    private final TTDCassandraClient client;
    private final Optional<AsyncThreadPoolRegistration> asyncThreadPool;

    public ResponsiveTopologyTestDriver(Topology topology) {
        this(topology, new Properties());
    }

    public ResponsiveTopologyTestDriver(Topology topology, Properties config) {
        this(topology, config, null);
    }

    public ResponsiveTopologyTestDriver(Topology topology, Instant initialWallClockTimeMs) {
        this(topology, new Properties(), initialWallClockTimeMs);
    }

    public ResponsiveTopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime) {
        this(topology, ResponsiveTopologyTestDriver.baseProps(config), initialWallClockTime, new TTDCassandraClient(new TTDMockAdmin(ResponsiveTopologyTestDriver.baseProps(config), topology), (Time)ResponsiveTopologyTestDriver.mockTime(initialWallClockTime)));
    }

    public void advanceWallClockTime(Duration advance) {
        this.client.advanceWallClockTime(advance);
        this.client.flush();
        super.advanceWallClockTime(advance);
    }

    public void flush() {
        this.asyncThreadPool.ifPresent(AsyncThreadPoolRegistration::flushAllAsyncEvents);
        this.client.flush();
        super.advanceWallClockTime(Duration.ZERO);
    }

    private ResponsiveTopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime, TTDCassandraClient cassandraClient) {
        super(topology, ResponsiveTopologyTestDriver.testDriverProps(config, topology.describe(), cassandraClient), initialWallClockTime);
        this.client = cassandraClient;
        this.asyncThreadPool = ResponsiveTopologyTestDriver.getAsyncThreadPoolRegistration(super.props());
    }

    @Override
    protected <K, V> void pipeRecord(String topic, TestRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer, Instant time) {
        super.pipeRecord(topic, record, keySerializer, valueSerializer, time);
        this.flush();
    }

    private static Properties testDriverProps(Properties baseProps, TopologyDescription topologyDescription, TTDCassandraClient client) {
        SessionClients sessionClients = new SessionClients(Optional.empty(), Optional.of(client), Optional.empty(), false, (Admin)client.mockAdmin());
        TTDRestoreListener restoreListener = TTDRestoreListener.mockRestoreListener(baseProps);
        sessionClients.initialize(restoreListener.metrics(), (ResponsiveRestoreListener)restoreListener);
        ResponsiveMetrics metrics = new ResponsiveMetrics(new Metrics());
        String appId = baseProps.getProperty("application.id");
        metrics.initializeTags(appId, appId + "-client", ClientVersionMetadata.loadVersionMetadata(), Collections.emptyMap());
        InternalSessionConfigs.Builder sessionConfig = new InternalSessionConfigs.Builder().withSessionClients(sessionClients).withStoreRegistry(client.storeRegistry()).withMetrics(metrics).withTopologyDescription(topologyDescription);
        AsyncUtils.configuredAsyncThreadPool((ResponsiveConfig)ResponsiveConfig.responsiveConfig((Map)baseProps), (int)1, (ResponsiveMetrics)metrics).ifPresent(threadPoolRegistry -> {
            threadPoolRegistry.startNewAsyncThreadPool(Thread.currentThread().getName());
            sessionConfig.withAsyncThreadPoolRegistry(threadPoolRegistry);
        });
        baseProps.putAll((Map<?, ?>)sessionConfig.build());
        return baseProps;
    }

    private static Properties baseProps(Properties userProps) {
        Properties props = new Properties();
        props.putAll((Map<?, ?>)userProps);
        props.put("responsive.org", RESPONSIVE_TTD_ORG);
        props.put("responsive.env", RESPONSIVE_TTD_ENV);
        props.put("responsive.store.flush.trigger.local.interval.ms", (Object)0);
        props.putIfAbsent("cache.max.bytes.buffering", (Object)0);
        props.putIfAbsent("application.id", "Responsive-TopologyTestDriver");
        return props;
    }

    private static MockTime mockTime(Instant initialWallClockTime) {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        long initialWallClockTimeMs = initialWallClockTime == null ? System.currentTimeMillis() : initialWallClockTime.toEpochMilli();
        mockTime.setCurrentTimeMs(initialWallClockTimeMs);
        return mockTime;
    }

    private static Optional<AsyncThreadPoolRegistration> getAsyncThreadPoolRegistration(Properties props) {
        int asyncThreadPoolSize = (Integer)props.getOrDefault((Object)"responsive.async.thread.pool.size", (Object)0);
        if (asyncThreadPoolSize > 0) {
            HashMap configMap = new HashMap();
            props.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> configMap.put(key.toString(), value)));
            return Optional.of(AsyncUtils.getAsyncThreadPool(configMap, (String)Thread.currentThread().getName()));
        }
        return Optional.empty();
    }
}

