package net.pincette.mongo.streams;

import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import com.mongodb.reactivestreams.client.MongoCollection;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import javax.json.JsonObject;
import javax.json.JsonValue;
import net.pincette.json.JsonUtil;
import net.pincette.mongo.BsonUtil;
import net.pincette.mongo.Collection;
import net.pincette.mongo.Expression;
import net.pincette.rs.Async;
import net.pincette.rs.Commit;
import net.pincette.rs.Filter;
import net.pincette.rs.Mapper;
import net.pincette.rs.Pipe;
import net.pincette.rs.streams.Message;
import net.pincette.util.Pair;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;

/* loaded from: input_file:net/pincette/mongo/streams/Deduplicate.class */
class Deduplicate {
    private static final String CACHE_WINDOW = "cacheWindow";
    private static final String COLLECTION = "collection";
    private static final String EXPRESSION = "expression";
    private static final String TIMESTAMP = "_timestamp";

    private Deduplicate() {
    }

    private static CompletionStage<Boolean> exists(MongoCollection<Document> mongoCollection, Bson bson, Context context) {
        return Util.tryForever(() -> {
            return Collection.findOne(mongoCollection, bson, BsonDocument.class, (UnaryOperator) null).thenApply((v0) -> {
                return v0.isPresent();
            });
        }, "$deduplicate", () -> {
            return "Collection " + mongoCollection + ", findOne with filter: " + JsonUtil.string(BsonUtil.fromBson(BsonUtil.toBsonDocument(bson)));
        }, context);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static CompletionStage<Boolean> save(MongoCollection<Document> mongoCollection, List<BsonValue> list, Context context) {
        return Util.tryForever(() -> {
            return Collection.bulkWrite(mongoCollection, list.stream().map(Deduplicate::updateObject).toList()).thenApply(bulkWriteResult -> {
                return (BulkWriteResult) net.pincette.util.Util.must(bulkWriteResult, (v0) -> {
                    return v0.wasAcknowledged();
                });
            }).thenApply(bulkWriteResult2 -> {
                return true;
            });
        }, "$deduplicate", () -> {
            return "Collection " + mongoCollection + ", save: " + JsonUtil.string(JsonUtil.from(list.stream().map(BsonUtil::fromBson)));
        }, context);
    }

    private static <T, U> Stream<U> second(List<Pair<T, U>> list) {
        return (Stream<U>) list.stream().map(pair -> {
            return pair.second;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> stage(JsonValue jsonValue, Context context) {
        net.pincette.util.Util.must(JsonUtil.isObject(jsonValue));
        JsonObject asJsonObject = jsonValue.asJsonObject();
        net.pincette.util.Util.must(asJsonObject.containsKey(COLLECTION));
        Duration ofMillis = Duration.ofMillis(asJsonObject.getInt(CACHE_WINDOW, 3000));
        MongoCollection collection = context.database.getCollection(asJsonObject.getString(COLLECTION));
        Function function = Expression.function((JsonValue) asJsonObject.get(EXPRESSION), context.features);
        return Pipe.pipe(net.pincette.rs.Util.duplicateFilter(message -> {
            return (JsonValue) function.apply((JsonObject) message.value);
        }, ofMillis)).then(Mapper.map(message2 -> {
            return Pair.pair(message2, BsonUtil.fromJson((JsonValue) function.apply((JsonObject) message2.value)));
        })).then(Async.mapAsyncSequential(pair -> {
            return exists(collection, Filters.eq("_id", (BsonValue) pair.second), context).thenApply(bool -> {
                return Pair.pair(pair, bool);
            });
        })).then(Filter.filter(pair2 -> {
            return !((Boolean) pair2.second).booleanValue();
        })).then(Mapper.map(pair3 -> {
            return (Pair) pair3.first;
        })).then(Commit.commit(list -> {
            return save(collection, second(list).toList(), context);
        })).then(Mapper.map(pair4 -> {
            return (Message) pair4.first;
        }));
    }

    private static UpdateOneModel<Document> updateObject(BsonValue bsonValue) {
        return new UpdateOneModel<>(Filters.eq("_id", bsonValue), Updates.combine(new Bson[]{Updates.set("_id", bsonValue), Updates.set(TIMESTAMP, new BsonInt64(Instant.now().toEpochMilli()))}), new UpdateOptions().upsert(true));
    }
}
