package io.quarkuscoffeeshop.barista.infrastructure;

import io.quarkus.runtime.annotations.RegisterForReflection;
import io.quarkuscoffeeshop.barista.domain.Barista;
import io.quarkuscoffeeshop.domain.Event;
import io.quarkuscoffeeshop.domain.EventType;
import io.quarkuscoffeeshop.domain.OrderInEvent;
import java.io.StringReader;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@RegisterForReflection
/* loaded from: input_file:io/quarkuscoffeeshop/barista/infrastructure/KafkaService.class */
public class KafkaService {

    @Inject
    Barista barista;

    @Inject
    @Channel("orders-out")
    Emitter<String> orderUpEmitter;
    Logger logger = LoggerFactory.getLogger(KafkaService.class);
    private Jsonb jsonb = JsonbBuilder.create();

    @Incoming("orders-in")
    public CompletionStage<Void> handleOrderIn(Message message) {
        this.logger.debug("message received: {}", message.getPayload());
        JsonObject readObject = Json.createReader(new StringReader((String) message.getPayload())).readObject();
        this.logger.debug("unmarshalled {}", readObject);
        if (!readObject.containsKey("eventType")) {
            return message.ack();
        }
        OrderInEvent orderInEvent = (OrderInEvent) this.jsonb.fromJson((String) message.getPayload(), OrderInEvent.class);
        return orderInEvent.eventType.equals(EventType.BEVERAGE_ORDER_IN) ? this.barista.make(orderInEvent).thenApply(collection -> {
            return sendEvents(collection);
        }).thenRun(() -> {
            message.ack();
        }) : message.ack();
    }

    CompletableFuture<Void> sendEvents(Collection<Event> collection) {
        this.logger.debug("{} events returned", Integer.valueOf(collection.size()));
        return collection.size() == 1 ? sendEvent((Event) collection.toArray()[0]) : CompletableFuture.allOf((CompletableFuture[]) ((List) collection.stream().map(event -> {
            return sendEvent(event);
        }).collect(Collectors.toList())).toArray(i -> {
            return new CompletableFuture[i];
        })).exceptionally(th -> {
            this.logger.error(th.getMessage());
            return null;
        });
    }

    CompletableFuture<Void> sendEvent(Event event) {
        this.logger.debug("sending: {}", event.toString());
        return this.orderUpEmitter.send(this.jsonb.toJson(event)).toCompletableFuture();
    }
}
