package io.gridgo.connector.mongodb;

import com.mongodb.async.client.FindIterable;
import com.mongodb.async.client.MongoClient;
import com.mongodb.async.client.MongoCollection;
import com.mongodb.async.client.MongoDatabase;
import com.mongodb.client.model.CountOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertManyOptions;
import com.mongodb.client.model.Projections;
import io.gridgo.bean.BArray;
import io.gridgo.bean.BElement;
import io.gridgo.bean.BObject;
import io.gridgo.bean.BReference;
import io.gridgo.bean.BValue;
import io.gridgo.connector.impl.AbstractProducer;
import io.gridgo.connector.mongodb.support.MongoOperationException;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.framework.support.Message;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.joo.promise4j.Deferred;
import org.joo.promise4j.Promise;
import org.joo.promise4j.impl.CompletableDeferredObject;
import org.joo.promise4j.impl.SimpleFailurePromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gridgo/connector/mongodb/MongoDBProducer.class */
public class MongoDBProducer extends AbstractProducer {
    private static final Logger log = LoggerFactory.getLogger(MongoDBProducer.class);
    private Map<String, ProducerHandler> operations;
    private MongoCollection<Document> collection;
    private MongoDatabase database;
    private String generatedName;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/gridgo/connector/mongodb/MongoDBProducer$ProducerHandler.class */
    public interface ProducerHandler {
        void handle(Message message, Deferred<Message, Exception> deferred, boolean z);
    }

    public MongoDBProducer(ConnectorContext connectorContext, String str, String str2, String str3) {
        super(connectorContext);
        this.operations = new HashMap();
        this.database = ((MongoClient) getContext().getRegistry().lookupMandatory(str, MongoClient.class)).getDatabase(str2);
        this.collection = this.database.getCollection(str3);
        this.generatedName = "producer.mongodb." + str + "." + str2 + "." + str3;
        bindHandlers();
    }

    private Promise<Message, Exception> _call(Message message, CompletableDeferredObject<Message, Exception> completableDeferredObject, boolean z) {
        String string = message.getPayload().getHeaders().getString(MongoDBConstants.OPERATION);
        ProducerHandler producerHandler = this.operations.get(string);
        if (producerHandler == null) {
            return new SimpleFailurePromise(new IllegalArgumentException("Operation " + string + " is not supported"));
        }
        try {
            producerHandler.handle(message, completableDeferredObject, z);
            if (completableDeferredObject != null) {
                return completableDeferredObject.promise();
            }
            return null;
        } catch (Exception e) {
            log.error("Error while processing MongoDB request", e);
            return new SimpleFailurePromise(e);
        }
    }

    private void ack(Deferred<Message, Exception> deferred, Object obj, Throwable th) {
        if (deferred == null) {
            return;
        }
        if (th == null) {
            deferred.resolve(convertToMessage(obj));
        } else if (th instanceof Exception) {
            deferred.reject((Exception) th);
        } else {
            deferred.reject(new MongoOperationException(th));
        }
    }

    private void applyProjection(Message message, FindIterable<Document> findIterable) {
        BObject headers = message.getPayload().getHeaders();
        Bson bson = (Bson) getHeaderAs(message, MongoDBConstants.PROJECT, Bson.class);
        BArray array = headers.getArray(MongoDBConstants.PROJECT_INCLUDE, (BArray) null);
        BArray array2 = headers.getArray(MongoDBConstants.PROJECT_EXCLUDE, (BArray) null);
        if (bson == null && array == null && array2 == null) {
            return;
        }
        if (array != null) {
            bson = Projections.include(toStringArray(array));
        } else if (array2 != null) {
            bson = Projections.exclude(toStringArray(array2));
        }
        findIterable.projection(bson);
    }

    public void bind(String str, ProducerHandler producerHandler) {
        this.operations.put(str, producerHandler);
    }

    private void bindHandlers() {
        bind(MongoDBConstants.OPERATION_INSERT, this::insertDocument);
        bind(MongoDBConstants.OPERATION_COUNT, this::countCollection);
        bind(MongoDBConstants.OPERATION_FIND_ALL, this::findAllDocuments);
        bind(MongoDBConstants.OPERATION_FIND_BY_ID, this::findById);
        bind(MongoDBConstants.OPERATION_UPDATE_ONE, this::updateDocument);
        bind(MongoDBConstants.OPERATION_UPDATE_MANY, this::updateManyDocuments);
        bind(MongoDBConstants.OPERATION_DELETE_ONE, this::deleteDocument);
        bind(MongoDBConstants.OPERATION_DELETE_MANY, this::deleteManyDocuments);
    }

    public Promise<Message, Exception> call(Message message) {
        return _call(message, new CompletableDeferredObject<>(), true);
    }

    private Document convertToDocument(BReference bReference) {
        return (Document) bReference.getReference();
    }

    private List<Document> convertToDocuments(BArray bArray) {
        return (List) StreamSupport.stream(bArray.spliterator(), false).map(bElement -> {
            return convertToDocument(bElement.asReference());
        }).collect(Collectors.toList());
    }

    private Message convertToMessage(Object obj) {
        if (obj == null) {
            return null;
        }
        if (obj instanceof Long) {
            return createMessage(BObject.ofEmpty(), BValue.of(obj));
        }
        if (obj instanceof Document) {
            return createMessage(BObject.ofEmpty(), toReference((Document) obj));
        }
        if (!(obj instanceof List)) {
            return null;
        }
        return createMessage(BObject.ofEmpty(), BArray.of((List) StreamSupport.stream(((List) obj).spliterator(), false).map(this::toReference).collect(Collectors.toList())));
    }

    public void countCollection(Message message, Deferred<Message, Exception> deferred, boolean z) {
        Bson bson = (Bson) getHeaderAs(message, MongoDBConstants.FILTER, Bson.class);
        CountOptions countOptions = (CountOptions) getHeaderAs(message, MongoDBConstants.COUNT_OPTIONS, CountOptions.class);
        if (countOptions != null) {
            this.collection.countDocuments(bson, countOptions, (l, th) -> {
                ack(deferred, z ? l : null, th);
            });
        } else {
            this.collection.countDocuments(bson, (l2, th2) -> {
                ack(deferred, z ? l2 : null, th2);
            });
        }
    }

    public void deleteDocument(Message message, Deferred<Message, Exception> deferred, boolean z) {
        this.collection.deleteOne((Bson) getHeaderAs(message, MongoDBConstants.FILTER, Bson.class), (deleteResult, th) -> {
            ack(deferred, z ? Long.valueOf(deleteResult.getDeletedCount()) : null, th);
        });
    }

    public void deleteManyDocuments(Message message, Deferred<Message, Exception> deferred, boolean z) {
        this.collection.deleteMany((Bson) getHeaderAs(message, MongoDBConstants.FILTER, Bson.class), (deleteResult, th) -> {
            ack(deferred, z ? Long.valueOf(deleteResult.getDeletedCount()) : null, th);
        });
    }

    public void findAllDocuments(Message message, Deferred<Message, Exception> deferred, boolean z) {
        Bson bson = (Bson) getHeaderAs(message, MongoDBConstants.FILTER, Bson.class);
        BObject headers = message.getPayload().getHeaders();
        int intValue = headers.getInteger(MongoDBConstants.BATCH_SIZE, -1).intValue();
        int intValue2 = headers.getInteger(MongoDBConstants.NUM_TO_SKIP, -1).intValue();
        int intValue3 = headers.getInteger(MongoDBConstants.LIMIT, -1).intValue();
        Bson bson2 = (Bson) getHeaderAs(message, MongoDBConstants.SORT_BY, Bson.class);
        FindIterable<Document> find = bson != null ? this.collection.find(bson) : this.collection.find();
        if (intValue != -1) {
            find.batchSize(intValue);
        }
        if (intValue2 != -1) {
            find.skip(intValue2);
        }
        if (intValue3 != -1) {
            find.limit(intValue3);
        }
        if (bson2 != null) {
            find.sort(bson2);
        }
        applyProjection(message, find);
        find.into(new ArrayList(), (arrayList, th) -> {
            ack(deferred, z ? arrayList : null, th);
        });
    }

    public void findById(Message message, Deferred<Message, Exception> deferred, boolean z) {
        FindIterable<Document> find = this.collection.find(Filters.eq(message.getPayload().getHeaders().getString(MongoDBConstants.ID_FIELD), message.getPayload().getBody().asValue().getData()));
        applyProjection(message, find);
        find.first((document, th) -> {
            ack(deferred, z ? document : null, th);
        });
    }

    protected String generateName() {
        return this.generatedName;
    }

    private <T> T getHeaderAs(Message message, String str, Class<T> cls) {
        BElement bElement = (BElement) message.getPayload().getHeaders().get(str);
        if (bElement == null) {
            return null;
        }
        return cls.cast(bElement.asReference().getReference());
    }

    public void insertDocument(Message message, Deferred<Message, Exception> deferred, boolean z) {
        BElement body = message.getPayload().getBody();
        if (body.isReference()) {
            this.collection.insertOne(convertToDocument(body.asReference()), (r7, th) -> {
                ack(deferred, null, th);
            });
            return;
        }
        List<Document> convertToDocuments = convertToDocuments(body.asArray());
        InsertManyOptions insertManyOptions = (InsertManyOptions) getHeaderAs(message, MongoDBConstants.INSERT_MANY_OPTIONS, InsertManyOptions.class);
        if (insertManyOptions != null) {
            this.collection.insertMany(convertToDocuments, insertManyOptions, (r72, th2) -> {
                ack(deferred, null, th2);
            });
        } else {
            this.collection.insertMany(convertToDocuments, (r73, th3) -> {
                ack(deferred, null, th3);
            });
        }
    }

    public boolean isCallSupported() {
        return true;
    }

    protected void onStart() {
    }

    protected void onStop() {
    }

    public void send(Message message) {
        _call(message, null, false);
    }

    public Promise<Message, Exception> sendWithAck(Message message) {
        return _call(message, new CompletableDeferredObject<>(), false);
    }

    private BObject toReference(Document document) {
        return BObject.of(document);
    }

    private String[] toStringArray(BArray bArray) {
        return (String[]) bArray.stream().filter(bElement -> {
            return bElement.isValue();
        }).map(bElement2 -> {
            return bElement2.asValue().getString();
        }).toArray(i -> {
            return new String[i];
        });
    }

    public void updateDocument(Message message, Deferred<Message, Exception> deferred, boolean z) {
        this.collection.updateOne((Bson) getHeaderAs(message, MongoDBConstants.FILTER, Bson.class), convertToDocument(message.getPayload().getBody().asReference()), (updateResult, th) -> {
            ack(deferred, z ? Long.valueOf(updateResult.getModifiedCount()) : null, th);
        });
    }

    public void updateManyDocuments(Message message, Deferred<Message, Exception> deferred, boolean z) {
        this.collection.updateMany((Bson) getHeaderAs(message, MongoDBConstants.FILTER, Bson.class), convertToDocument(message.getPayload().getBody().asReference()), (updateResult, th) -> {
            ack(deferred, z ? Long.valueOf(updateResult.getModifiedCount()) : null, th);
        });
    }
}
