package org.apache.pulsar.io.batchdatagenerator;

import io.codearte.jfairy.Fairy;
import io.codearte.jfairy.producer.person.PersonProperties;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.pulsar.io.core.BatchPushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.class */
public class BatchDataGeneratorPushSource extends BatchPushSource<Person> implements Runnable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BatchDataGeneratorPushSource.class);
    private Fairy fairy;
    private SourceContext sourceContext;
    private int maxRecordsPerCycle = 10;
    private ExecutorService executor = Executors.newSingleThreadExecutor();

    @Override // java.lang.AutoCloseable
    public void close() {
        this.executor.shutdownNow();
    }

    @Override // org.apache.pulsar.io.core.BatchSource
    public void open(Map map, SourceContext sourceContext) throws Exception {
        this.fairy = Fairy.create();
        this.sourceContext = sourceContext;
    }

    @Override // org.apache.pulsar.io.core.BatchSource
    public void discover(Consumer consumer) throws Exception {
        log.info("Generating one task for each instance");
        for (int i = 0; i < this.sourceContext.getNumInstances(); i++) {
            consumer.accept(String.format("something-%d", Long.valueOf(System.currentTimeMillis())).getBytes(StandardCharsets.UTF_8));
        }
    }

    @Override // org.apache.pulsar.io.core.BatchSource
    public void prepare(byte[] bArr) throws Exception {
        log.info("Instance " + this.sourceContext.getInstanceId() + " got a new discovered task {}", new String(bArr, StandardCharsets.UTF_8));
        this.executor.execute(this);
    }

    @Override // java.lang.Runnable
    public void run() {
        for (int i = 0; i < this.maxRecordsPerCycle; i++) {
            try {
                Thread.sleep(50L);
                consume(() -> {
                    return new Person(this.fairy.person(new PersonProperties.PersonProperty[0]));
                });
            } catch (Exception e) {
                notifyError(e);
                return;
            }
        }
        consume(null);
    }
}
