package io.quarkus.qe.kafka.consumer;

import io.quarkus.qe.kafka.StockPrice;
import io.quarkus.qe.kafka.status;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.function.BiConsumer;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;

@ApplicationScoped
/* loaded from: input_file:io/quarkus/qe/kafka/consumer/KStockPriceConsumer.class */
public class KStockPriceConsumer extends AbstractVerticle {
    private static final Logger LOG = Logger.getLogger(KStockPriceConsumer.class);

    @Inject
    @Channel("sink-stock-price")
    @OnOverflow(OnOverflow.Strategy.DROP)
    Emitter<StockPrice> emitter;

    public Uni<Void> asyncStart() {
        LOG.info("Verticle KStockPriceConsumer Up! (does nothing)");
        return Uni.createFrom().voidItem();
    }

    @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
    @Outgoing("stock-monitor")
    @Broadcast
    @Incoming("channel-stock-price")
    public String process(StockPrice stockPrice) {
        stockPrice.setStatus(status.COMPLETED);
        LOG.debugv("CONSUMER -> ID: {0}, PRICE: {1}", stockPrice.getId(), Double.valueOf(stockPrice.getPrice()));
        this.emitter.send(stockPrice).whenComplete(handlerEmitterResponse(KStockPriceConsumer.class.getName()));
        String id = stockPrice.getId();
        double price = stockPrice.getPrice();
        stockPrice.getStatus();
        return id + "-" + price + "-" + id;
    }

    private BiConsumer<Void, Throwable> handlerEmitterResponse(String str) {
        return (r7, th) -> {
            if (th != null) {
                LOG.info(String.format("D'oh! %s", th.getMessage()));
            }
        };
    }
}
