package io.quarkus.qe.kafka.producers;

import io.quarkus.qe.kafka.StockPrice;
import io.quarkus.qe.kafka.config.VertxKProducerConfig;
import io.quarkus.qe.kafka.status;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Random;
import java.util.function.BiConsumer;
import java.util.stream.IntStream;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.jboss.logging.Logger;

@ApplicationScoped
/* loaded from: input_file:io/quarkus/qe/kafka/producers/StockPriceProducer.class */
public class StockPriceProducer {
    private static final Logger LOG = Logger.getLogger(StockPriceProducer.class);

    @Inject
    VertxKProducerConfig config;

    @Inject
    @Channel("source-stock-price")
    @OnOverflow(OnOverflow.Strategy.DROP)
    Emitter<StockPrice> emitter;
    private Random random = new Random();

    public Uni<Void> generate() {
        IntStream.range(0, this.config.batchSize()).forEach(i -> {
            StockPrice m1build = StockPrice.newBuilder().setId("IBM").setPrice(this.random.nextDouble()).setStatus(status.PENDING).m1build();
            LOG.debugv("PRODUCER -> ID: {0}, PRICE: {1}", m1build.getId(), Double.valueOf(m1build.getPrice()));
            this.emitter.send(m1build).whenComplete(handlerEmitterResponse(StockPriceProducer.class.getName()));
        });
        return Uni.createFrom().voidItem();
    }

    private BiConsumer<Void, Throwable> handlerEmitterResponse(String str) {
        return (r8, th) -> {
            if (th != null) {
                LOG.warn(String.format("D'oh! %s", th.getMessage()));
            } else {
                LOG.debug(String.format("Message sent successfully. Owner %s", str));
            }
        };
    }
}
