package org.elasticsearch.river.mongodb;

import com.google.common.base.Preconditions;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoCursorNotFoundException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.MongoSocketException;
import com.mongodb.MongoTimeoutException;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.gridfs.GridFSFile;
import java.util.concurrent.atomic.AtomicLong;
import org.bson.BasicBSONObject;
import org.bson.types.ObjectId;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.river.mongodb.MongoDBRiver;
import org.elasticsearch.river.mongodb.util.MongoDBHelper;
import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper;

/* loaded from: input_file:org/elasticsearch/river/mongodb/CollectionSlurper.class */
class CollectionSlurper implements Runnable {
    private static final ESLogger logger = ESLoggerFactory.getLogger(CollectionSlurper.class.getName());
    private final MongoDBRiverDefinition definition;
    private final SharedContext context;
    private final Client esClient;
    private final MongoClient mongoClient;
    private Timestamp<?> timestamp;
    private final DB slurpedDb;
    private final AtomicLong totalDocuments = new AtomicLong();

    public CollectionSlurper(Timestamp<?> timestamp, MongoClient mongoClient, MongoDBRiverDefinition mongoDBRiverDefinition, SharedContext sharedContext, Client client) {
        this.timestamp = timestamp;
        this.definition = mongoDBRiverDefinition;
        this.context = sharedContext;
        this.esClient = client;
        this.mongoClient = mongoClient;
        this.slurpedDb = mongoClient.getDB(mongoDBRiverDefinition.getMongoDb());
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.definition.isSkipInitialImport() || this.definition.getInitialTimestamp() != null) {
            logger.info("Skip initial import from collection {}", new Object[]{this.definition.getMongoCollection()});
            return;
        }
        if (riverHasIndexedFromOplog()) {
            logger.trace("Initial import already completed.", new Object[0]);
            return;
        }
        try {
            try {
                try {
                    if (!isIndexEmpty()) {
                        MongoDBRiverHelper.setRiverStatus(this.esClient, this.definition.getRiverName(), Status.INITIAL_IMPORT_FAILED);
                        return;
                    }
                    if (this.definition.isImportAllCollections()) {
                        for (String str : this.slurpedDb.getCollectionNames()) {
                            if (str.length() < 7 || !str.substring(0, 7).equals("system.")) {
                                importCollection(this.slurpedDb.getCollection(str));
                            }
                        }
                    } else {
                        importCollection(this.slurpedDb.getCollection(this.definition.getMongoCollection()));
                    }
                    logger.debug("Before waiting for 500 ms", new Object[0]);
                    Thread.sleep(500L);
                } catch (Exception e) {
                    logger.error("Exception while looping in cursor", e, new Object[0]);
                    Thread.currentThread().interrupt();
                }
            } catch (MongoSocketException | MongoTimeoutException | MongoCursorNotFoundException e2) {
                logger.info("Oplog tailing - {} - {}. Will retry.", new Object[]{e2.getClass().getSimpleName(), e2.getMessage()});
                logger.debug("Total documents inserted so far by river {}: {}", new Object[]{this.definition.getRiverName(), Long.valueOf(this.totalDocuments.get())});
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException unused) {
                    logger.info("river-mongodb slurper interrupted", new Object[0]);
                    Thread.currentThread().interrupt();
                }
            }
        } catch (MongoInterruptedException | InterruptedException unused2) {
            logger.info("river-mongodb slurper interrupted", new Object[0]);
            Thread.currentThread().interrupt();
        }
    }

    protected boolean riverHasIndexedFromOplog() {
        return MongoDBRiver.getLastTimestamp(this.esClient, this.definition) != null;
    }

    protected boolean isIndexEmpty() {
        return MongoDBRiver.getIndexCount(this.esClient, this.definition) == 0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void importCollection(DBCollection dBCollection) throws InterruptedException {
        DBCursor fileList;
        logger.info("MongoDBRiver is beginning initial import of " + dBCollection.getFullName(), new Object[0]);
        boolean z = true;
        String str = null;
        while (z) {
            DBCursor dBCursor = null;
            try {
                try {
                    if (this.definition.isDisableIndexRefresh()) {
                        updateIndexRefresh(this.definition.getIndexName(), -1L);
                    }
                    if (this.definition.isMongoGridFS()) {
                        GridFS gridFS = new GridFS(this.mongoClient.getDB(this.definition.getMongoDb()), this.definition.getMongoCollection());
                        fileList = gridFS.getFileList();
                        while (fileList.hasNext()) {
                            DBObject next = fileList.next();
                            if (next instanceof GridFSDBFile) {
                                GridFSDBFile findOne = gridFS.findOne(new ObjectId(next.get(MongoDBRiver.MONGODB_ID_FIELD).toString()));
                                if (fileList.hasNext()) {
                                    str = addInsertToStream(null, findOne);
                                } else {
                                    logger.debug("Last entry for initial import of {} - add timestamp: {}", new Object[]{dBCollection.getFullName(), this.timestamp});
                                    str = addInsertToStream(this.timestamp, findOne);
                                }
                            }
                        }
                        z = false;
                    } else {
                        if (logger.isTraceEnabled()) {
                            logger.trace("Collection {} - count: {}", new Object[]{dBCollection.getName(), Long.valueOf(dBCollection.count())});
                        }
                        long j = 0;
                        fileList = dBCollection.find(getFilterForInitialImport(this.definition.getMongoCollectionFilter(), str)).sort(new BasicDBObject(MongoDBRiver.MONGODB_ID_FIELD, 1));
                        while (fileList.hasNext() && this.context.getStatus() == Status.RUNNING) {
                            DBObject next2 = fileList.next();
                            j++;
                            if (fileList.hasNext()) {
                                str = addInsertToStream(null, applyFieldFilter(next2), dBCollection.getName());
                            } else {
                                logger.debug("Last entry for initial import of {} - add timestamp: {}", new Object[]{dBCollection.getFullName(), this.timestamp});
                                str = addInsertToStream(this.timestamp, applyFieldFilter(next2), dBCollection.getName());
                            }
                        }
                        z = false;
                        logger.info("Number of documents indexed in initial import of {}: {}", new Object[]{dBCollection.getFullName(), Long.valueOf(j)});
                    }
                    if (fileList != null) {
                        logger.trace("Closing initial import cursor", new Object[0]);
                        fileList.close();
                    }
                    if (this.definition.isDisableIndexRefresh()) {
                        updateIndexRefresh(this.definition.getIndexName(), TimeValue.timeValueSeconds(1L));
                    }
                } catch (MongoSocketException | MongoTimeoutException | MongoCursorNotFoundException e) {
                    logger.info("Initial import - {} - {}. Will retry.", new Object[]{e.getClass().getSimpleName(), e.getMessage()});
                    logger.debug("Total documents inserted so far by river {}: {}", new Object[]{this.definition.getRiverName(), Long.valueOf(this.totalDocuments.get())});
                    Thread.sleep(10000L);
                    if (0 != 0) {
                        logger.trace("Closing initial import cursor", new Object[0]);
                        dBCursor.close();
                    }
                    if (this.definition.isDisableIndexRefresh()) {
                        updateIndexRefresh(this.definition.getIndexName(), TimeValue.timeValueSeconds(1L));
                    }
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    logger.trace("Closing initial import cursor", new Object[0]);
                    dBCursor.close();
                }
                if (this.definition.isDisableIndexRefresh()) {
                    updateIndexRefresh(this.definition.getIndexName(), TimeValue.timeValueSeconds(1L));
                }
                throw th;
            }
        }
    }

    private BasicDBObject getFilterForInitialImport(BasicDBObject basicDBObject, String str) {
        Preconditions.checkNotNull(basicDBObject);
        if (str == null) {
            return basicDBObject;
        }
        BasicDBObject basicDBObject2 = new BasicDBObject(MongoDBRiver.MONGODB_ID_FIELD, new BasicBSONObject("$gt", str));
        return basicDBObject.equals(new BasicDBObject()) ? basicDBObject2 : new BasicDBObject(MongoDBRiver.MONGODB_AND_OPERATOR, ImmutableList.of(basicDBObject, basicDBObject2));
    }

    private void updateIndexRefresh(String str, Object obj) {
        this.esClient.admin().indices().prepareUpdateSettings(new String[]{str}).setSettings(ImmutableMap.of("index.refresh_interval", obj)).get();
    }

    private DBObject applyFieldFilter(DBObject dBObject) {
        if (dBObject instanceof GridFSFile) {
            GridFSFile gridFSFile = (GridFSFile) dBObject;
            DBObject metaData = gridFSFile.getMetaData();
            if (metaData != null) {
                gridFSFile.setMetaData(applyFieldFilter(metaData));
            }
        } else {
            dBObject = MongoDBHelper.applyIncludeFields(MongoDBHelper.applyExcludeFields(dBObject, this.definition.getExcludeFields()), this.definition.getIncludeFields());
        }
        return dBObject;
    }

    private String addInsertToStream(Timestamp<?> timestamp, DBObject dBObject) throws InterruptedException {
        return addInsertToStream(timestamp, dBObject, this.definition.getMongoCollection());
    }

    private String addInsertToStream(Timestamp<?> timestamp, DBObject dBObject, String str) throws InterruptedException {
        this.totalDocuments.incrementAndGet();
        addToStream(Operation.INSERT, timestamp, dBObject, str);
        if (dBObject != null && dBObject.containsField(MongoDBRiver.MONGODB_ID_FIELD)) {
            return dBObject.get(MongoDBRiver.MONGODB_ID_FIELD).toString();
        }
        return null;
    }

    private void addToStream(Operation operation, Timestamp<?> timestamp, DBObject dBObject, String str) throws InterruptedException {
        if (logger.isTraceEnabled()) {
            String obj = dBObject.toString();
            if (obj.length() > 400) {
                logger.trace("addToStream - operation [{}], currentTimestamp [{}], data (_id:[{}], serialized length:{}), collection [{}]", new Object[]{operation, timestamp, dBObject.get(MongoDBRiver.MONGODB_ID_FIELD), Integer.valueOf(obj.length()), str});
            } else {
                logger.trace("addToStream - operation [{}], currentTimestamp [{}], data [{}], collection [{}]", new Object[]{operation, timestamp, obj, str});
            }
        }
        if (operation != Operation.DROP_DATABASE) {
            this.context.getStream().put(new MongoDBRiver.QueueEntry(timestamp, operation, dBObject, str));
            return;
        }
        logger.info("addToStream - Operation.DROP_DATABASE, currentTimestamp [{}], data [{}], collection [{}]", new Object[]{timestamp, dBObject, str});
        if (!this.definition.isImportAllCollections()) {
            this.context.getStream().put(new MongoDBRiver.QueueEntry(timestamp, Operation.DROP_COLLECTION, dBObject, str));
            return;
        }
        for (String str2 : this.slurpedDb.getCollectionNames()) {
            logger.info("addToStream - isImportAllCollections - Operation.DROP_DATABASE, currentTimestamp [{}], data [{}], collection [{}]", new Object[]{timestamp, dBObject, str2});
            this.context.getStream().put(new MongoDBRiver.QueueEntry(timestamp, Operation.DROP_COLLECTION, dBObject, str2));
        }
    }
}
