/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.test.framework;

import com.google.common.base.Preconditions;
import java.io.File;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.execution.JobPlanner;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.operators.KV;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.descriptors.StreamDescriptor;
import org.apache.samza.system.inmemory.InMemorySystemFactory;
import org.apache.samza.system.inmemory.InMemorySystemProducer;
import org.apache.samza.task.AsyncStreamTask;
import org.apache.samza.task.StreamTask;
import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestRunner {
    private static final Logger LOG = LoggerFactory.getLogger(TestRunner.class);
    private static final String JOB_DEFAULT_SYSTEM = "default-samza-system";
    private static final String APP_NAME = "samza-test";
    private Map<String, String> configs = new HashMap<String, String>();
    private SamzaApplication app;
    private ExternalContext externalContext;
    private String inMemoryScope = RandomStringUtils.random((int)10, (boolean)true, (boolean)true);

    private TestRunner() {
        this.configs.put("app.name", APP_NAME);
        this.configs.put("processor.id", "1");
        this.configs.put("job.coordinator.factory", PassthroughJobCoordinatorFactory.class.getName());
        this.configs.put("startpoint.metadata.store.factory", InMemoryMetadataStoreFactory.class.getCanonicalName());
        this.configs.put("task.name.grouper.factory", SingleContainerGrouperFactory.class.getName());
        this.configs.put("job.non-logged.store.base.dir", new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope + "-non-logged").getAbsolutePath());
        this.configs.put("job.logged.store.base.dir", new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope + "-logged").getAbsolutePath());
        this.addConfig("job.default.system", JOB_DEFAULT_SYSTEM);
        this.addConfig("job.host-affinity.enabled", Boolean.FALSE.toString());
        this.addConfig("inmemory.scope", this.inMemoryScope);
        this.addConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(this.inMemoryScope).toConfig());
    }

    private TestRunner(Class taskClass) {
        this();
        Preconditions.checkNotNull((Object)taskClass);
        this.configs.put("task.class", taskClass.getName());
        this.app = new LegacyTaskApplication(taskClass.getName());
    }

    private TestRunner(SamzaApplication app) {
        this();
        Preconditions.checkNotNull((Object)app);
        this.app = app;
    }

    public static TestRunner of(Class taskClass) {
        Preconditions.checkNotNull((Object)taskClass);
        Preconditions.checkState((StreamTask.class.isAssignableFrom(taskClass) || AsyncStreamTask.class.isAssignableFrom(taskClass) ? 1 : 0) != 0);
        return new TestRunner(taskClass);
    }

    public static TestRunner of(SamzaApplication app) {
        Preconditions.checkNotNull((Object)app);
        return new TestRunner(app);
    }

    public TestRunner addConfig(String key, String value) {
        Preconditions.checkNotNull((Object)key);
        Preconditions.checkNotNull((Object)value);
        this.configs.put(key, value);
        return this;
    }

    public TestRunner addConfig(Map<String, String> config) {
        Preconditions.checkNotNull(config);
        this.configs.putAll(config);
        return this;
    }

    public TestRunner addExternalContext(ExternalContext externalContext) {
        Preconditions.checkNotNull((Object)externalContext);
        this.externalContext = externalContext;
        return this;
    }

    public <StreamMessageType> TestRunner addInputStream(InMemoryInputDescriptor descriptor, List<StreamMessageType> messages) {
        Preconditions.checkNotNull((Object)((Object)descriptor), messages);
        HashMap<Integer, Iterable<StreamMessageType>> partitionData = new HashMap<Integer, Iterable<StreamMessageType>>();
        partitionData.put(0, messages);
        this.initializeInMemoryInputStream(descriptor, partitionData);
        return this;
    }

    public <StreamMessageType> TestRunner addInputStream(InMemoryInputDescriptor descriptor, Map<Integer, ? extends Iterable<StreamMessageType>> messages) {
        Preconditions.checkNotNull((Object)((Object)descriptor), messages);
        HashMap<Integer, Iterable<StreamMessageType>> partitionData = new HashMap<Integer, Iterable<StreamMessageType>>();
        partitionData.putAll(messages);
        this.initializeInMemoryInputStream(descriptor, partitionData);
        return this;
    }

    public TestRunner addOutputStream(InMemoryOutputDescriptor<?> streamDescriptor, int partitionCount) {
        Preconditions.checkNotNull(streamDescriptor);
        Preconditions.checkState((partitionCount >= 1 ? 1 : 0) != 0);
        InMemorySystemDescriptor imsd = (InMemorySystemDescriptor)streamDescriptor.getSystemDescriptor();
        imsd.withInMemoryScope(this.inMemoryScope);
        MapConfig config = new MapConfig(new Map[]{streamDescriptor.toConfig(), streamDescriptor.getSystemDescriptor().toConfig()});
        InMemorySystemFactory factory = new InMemorySystemFactory();
        String physicalName = streamDescriptor.getPhysicalName().orElse(streamDescriptor.getStreamId());
        StreamSpec spec = new StreamSpec(streamDescriptor.getStreamId(), physicalName, streamDescriptor.getSystemName(), partitionCount);
        factory.getAdmin(streamDescriptor.getSystemName(), (Config)config).createStream(spec);
        this.addConfig(streamDescriptor.toConfig());
        this.addConfig(streamDescriptor.getSystemDescriptor().toConfig());
        this.addSerdeConfigs((StreamDescriptor)streamDescriptor);
        return this;
    }

    public void run(Duration timeout) {
        Preconditions.checkNotNull((Object)this.app);
        Preconditions.checkState((!timeout.isZero() || !timeout.isNegative() ? 1 : 0) != 0, (Object)"Timeouts should be positive");
        this.deleteStoreDirectories();
        MapConfig config = new MapConfig((Map)JobPlanner.generateSingleJobConfig(this.configs));
        LocalApplicationRunner runner = new LocalApplicationRunner(this.app, (Config)config, (MetadataStoreFactory)new InMemoryMetadataStoreFactory());
        runner.run(this.externalContext);
        if (!runner.waitForFinish(timeout)) {
            throw new SamzaException("Timed out waiting for application to finish");
        }
        ApplicationStatus status = runner.status();
        this.deleteStoreDirectories();
        if (status.getStatusCode() == ApplicationStatus.StatusCode.UnsuccessfulFinish) {
            throw new SamzaException("Application could not finish successfully", status.getThrowable());
        }
    }

    public static <StreamMessageType> Map<Integer, List<StreamMessageType>> consumeStream(InMemoryOutputDescriptor outputDescriptor, Duration timeout) throws SamzaException {
        Preconditions.checkNotNull((Object)((Object)outputDescriptor));
        String streamId = outputDescriptor.getStreamId();
        String systemName = outputDescriptor.getSystemName();
        HashSet ssps = new HashSet();
        HashSet<String> streamIds = new HashSet<String>();
        streamIds.add(streamId);
        InMemorySystemFactory factory = new InMemorySystemFactory();
        MapConfig config = new MapConfig(new Map[]{outputDescriptor.toConfig(), outputDescriptor.getSystemDescriptor().toConfig()});
        Map metadata = factory.getAdmin(systemName, (Config)config).getSystemStreamMetadata(streamIds);
        SystemConsumer consumer = factory.getConsumer(systemName, (Config)config, null);
        String name = outputDescriptor.getPhysicalName().orElse(streamId);
        ((SystemStreamMetadata)metadata.get(name)).getSystemStreamPartitionMetadata().keySet().forEach(partition -> {
            SystemStreamPartition temp = new SystemStreamPartition(systemName, streamId, partition);
            ssps.add(temp);
            consumer.register(temp, "0");
        });
        long t = System.currentTimeMillis();
        HashMap<SystemStreamPartition, List> output = new HashMap<SystemStreamPartition, List>();
        HashSet didNotReachEndOfStream = new HashSet(ssps);
        while (System.currentTimeMillis() < t + timeout.toMillis()) {
            Map currentState = null;
            try {
                currentState = consumer.poll(ssps, 10L);
            }
            catch (InterruptedException e) {
                throw new SamzaException("Timed out while consuming stream \n" + e.getMessage());
            }
            for (Map.Entry entry2 : currentState.entrySet()) {
                SystemStreamPartition ssp = (SystemStreamPartition)entry2.getKey();
                output.computeIfAbsent(ssp, k -> new LinkedList());
                List currentBuffer = (List)entry2.getValue();
                int totalMessagesToFetch = Integer.valueOf(((SystemStreamMetadata.SystemStreamPartitionMetadata)((SystemStreamMetadata)metadata.get(outputDescriptor.getStreamId())).getSystemStreamPartitionMetadata().get(ssp.getPartition())).getUpcomingOffset());
                if (((List)output.get(ssp)).size() + currentBuffer.size() == totalMessagesToFetch) {
                    didNotReachEndOfStream.remove(entry2.getKey());
                    ssps.remove(entry2.getKey());
                }
                ((List)output.get(ssp)).addAll(currentBuffer);
            }
            if (!didNotReachEndOfStream.isEmpty()) continue;
            break;
        }
        if (!didNotReachEndOfStream.isEmpty()) {
            throw new IllegalStateException("Could not poll for all system stream partitions");
        }
        return output.entrySet().stream().collect(Collectors.toMap(entry -> ((SystemStreamPartition)entry.getKey()).getPartition().getPartitionId(), entry -> ((List)entry.getValue()).stream().map(e -> e.getMessage()).collect(Collectors.toList())));
    }

    private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor<?> descriptor, Map<Integer, Iterable<StreamMessageType>> partitionData) {
        String systemName = descriptor.getSystemName();
        String streamName = descriptor.getPhysicalName().orElse(descriptor.getStreamId());
        if (this.app instanceof LegacyTaskApplication) {
            if (this.configs.containsKey("task.inputs")) {
                this.configs.put("task.inputs", this.configs.get("task.inputs").concat("," + systemName + "." + streamName));
            } else {
                this.configs.put("task.inputs", systemName + "." + streamName);
            }
        }
        InMemorySystemDescriptor imsd = (InMemorySystemDescriptor)descriptor.getSystemDescriptor();
        imsd.withInMemoryScope(this.inMemoryScope);
        this.addConfig(descriptor.toConfig());
        this.addConfig(descriptor.getSystemDescriptor().toConfig());
        this.addSerdeConfigs((StreamDescriptor)descriptor);
        StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitionData.size());
        InMemorySystemFactory factory = new InMemorySystemFactory();
        MapConfig config = new MapConfig(new Map[]{descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig()});
        factory.getAdmin(systemName, (Config)config).createStream(spec);
        InMemorySystemProducer producer = (InMemorySystemProducer)factory.getProducer(systemName, (Config)config, null);
        SystemStream sysStream = new SystemStream(systemName, streamName);
        partitionData.forEach((partitionId, partition) -> {
            partition.forEach(e -> {
                Object value;
                Object key = e instanceof KV ? ((KV)e).getKey() : null;
                Object object = value = e instanceof KV ? ((KV)e).getValue() : e;
                if (value instanceof IncomingMessageEnvelope) {
                    producer.send((IncomingMessageEnvelope)value);
                } else {
                    producer.send(systemName, new OutgoingMessageEnvelope(sysStream, partitionId, key, value));
                }
            });
            producer.send(systemName, new OutgoingMessageEnvelope(sysStream, partitionId, null, (Object)new EndOfStreamMessage(null)));
        });
    }

    private void deleteStoreDirectories() {
        Preconditions.checkNotNull((Object)this.configs.get("job.logged.store.base.dir"));
        Preconditions.checkNotNull((Object)this.configs.get("job.non-logged.store.base.dir"));
        this.deleteDirectory(this.configs.get("job.non-logged.store.base.dir"));
        this.deleteDirectory(this.configs.get("job.logged.store.base.dir"));
    }

    private void deleteDirectory(String path) {
        File dir = new File(path);
        LOG.info("Deleting the directory " + path);
        new FileUtil().rm(dir);
        if (dir.exists()) {
            LOG.warn("Could not delete the directory " + path);
        }
    }

    private void addSerdeConfigs(StreamDescriptor descriptor) {
        String streamIdPrefix = String.format("streams.%s.", descriptor.getStreamId());
        String keySerdeConfigKey = streamIdPrefix + "samza.key.serde";
        String msgSerdeConfigKey = streamIdPrefix + "samza.msg.serde";
        this.configs.put(keySerdeConfigKey, null);
        this.configs.put(msgSerdeConfigKey, null);
    }
}

