package net.pincette.mongo.streams;

import java.util.Optional;
import java.util.function.Function;
import javax.json.JsonObject;
import javax.json.JsonValue;
import net.pincette.jes.util.Kafka;
import net.pincette.json.JsonUtil;
import net.pincette.mongo.Expression;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;

/* loaded from: input_file:net/pincette/mongo/streams/Send.class */
class Send {
    private static final String TOPIC = "topic";
    static final /* synthetic */ boolean $assertionsDisabled;

    private Send() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KStream<String, JsonObject> stage(KStream<String, JsonObject> kStream, JsonValue jsonValue, Context context) {
        if (!$assertionsDisabled && !JsonUtil.isObject(jsonValue)) {
            throw new AssertionError();
        }
        JsonObject asJsonObject = jsonValue.asJsonObject();
        if (!$assertionsDisabled && !asJsonObject.containsKey(TOPIC)) {
            throw new AssertionError();
        }
        Function function = Expression.function(asJsonObject.getValue("/topic"), context.features);
        return kStream.map((str, jsonObject) -> {
            return (KeyValue) Optional.of((JsonValue) function.apply(jsonObject)).filter(JsonUtil::isString).map(jsonValue2 -> {
                return (KeyValue) Kafka.send(context.producer, new ProducerRecord(JsonUtil.asString(jsonValue2).getString(), str, jsonObject)).thenApply(bool -> {
                    return new KeyValue(str, (JsonObject) null);
                }).toCompletableFuture().join();
            }).orElseGet(() -> {
                return new KeyValue(str, jsonObject);
            });
        }).filter((str2, jsonObject2) -> {
            return jsonObject2 != null;
        });
    }

    static {
        $assertionsDisabled = !Send.class.desiredAssertionStatus();
    }
}
