package net.pincette.mongo.streams;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Function;
import javax.json.JsonObject;
import javax.json.JsonValue;
import net.pincette.json.JsonUtil;
import net.pincette.mongo.Expression;
import net.pincette.rs.Async;
import net.pincette.rs.Box;
import net.pincette.rs.Filter;
import net.pincette.rs.streams.Message;

/* loaded from: input_file:net/pincette/mongo/streams/Send.class */
class Send {
    private static final String TOPIC = "topic";

    private Send() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> stage(JsonValue jsonValue, Context context) {
        net.pincette.util.Util.must(JsonUtil.isObject(jsonValue));
        JsonObject asJsonObject = jsonValue.asJsonObject();
        net.pincette.util.Util.must(asJsonObject.containsKey(TOPIC));
        Function function = Expression.function(asJsonObject.getValue("/topic"), context.features);
        return Box.box(Async.mapAsync(message -> {
            return (CompletionStage) Optional.of((JsonValue) function.apply((JsonObject) message.value)).filter(JsonUtil::isString).map(jsonValue2 -> {
                return Util.tryForever(() -> {
                    return context.producer.apply(JsonUtil.asString(jsonValue2).getString(), message).thenApply(bool -> {
                        return message.withValue((Object) null);
                    });
                }, "$send", () -> {
                    return "Topic " + jsonValue2 + ", send: " + JsonUtil.string((JsonValue) message.value);
                }, context);
            }).orElseGet(() -> {
                return CompletableFuture.completedFuture(message);
            });
        }), Filter.filter(message2 -> {
            return message2.value != null;
        }));
    }
}
