package org.elasticsearch.river.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.util.JSON;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.bson.BasicBSONObject;
import org.bson.types.BSONTimestamp;
import org.bson.types.ObjectId;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.river.mongodb.util.GridFSHelper;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;

/* loaded from: input_file:org/elasticsearch/river/mongodb/MongoDBRiver.class */
public class MongoDBRiver extends AbstractRiverComponent implements River {
    public static final String IS_MONGODB_ATTACHMENT = "is_mongodb_attachment";
    public static final String MONGODB_ATTACHMENT = "mongodb_attachment";
    public static final String RIVER_TYPE = "mongodb";
    public static final String ROOT_NAME = "mongodb";
    public static final String DB_FIELD = "db";
    public static final String SERVERS_FIELD = "servers";
    public static final String HOST_FIELD = "host";
    public static final String PORT_FIELD = "port";
    public static final String OPTIONS_FIELD = "options";
    public static final String SECONDARY_READ_PREFERENCE_FIELD = "secondary_read_preference";
    public static final String FILTER_FIELD = "filter";
    public static final String CREDENTIALS_FIELD = "credentials";
    public static final String USER_FIELD = "user";
    public static final String PASSWORD_FIELD = "password";
    public static final String SCRIPT_FIELD = "script";
    public static final String SCRIPT_TYPE_FIELD = "scriptType";
    public static final String COLLECTION_FIELD = "collection";
    public static final String GRIDFS_FIELD = "gridfs";
    public static final String INDEX_OBJECT = "index";
    public static final String NAME_FIELD = "name";
    public static final String TYPE_FIELD = "type";
    public static final String LOCAL_DB_FIELD = "local";
    public static final String ADMIN_DB_FIELD = "admin";
    public static final String DB_LOCAL = "local";
    public static final String DB_ADMIN = "admin";
    public static final String DB_CONFIG = "config";
    public static final String DEFAULT_DB_HOST = "localhost";
    public static final String THROTTLE_SIZE_FIELD = "throttle_size";
    public static final int DEFAULT_DB_PORT = 27017;
    public static final String BULK_SIZE_FIELD = "bulk_size";
    public static final String BULK_TIMEOUT_FIELD = "bulk_timeout";
    public static final String LAST_TIMESTAMP_FIELD = "_last_ts";
    public static final String MONGODB_LOCAL = "local";
    public static final String MONGODB_ADMIN = "admin";
    public static final String MONGODB_ID_FIELD = "_id";
    public static final String OPLOG_COLLECTION = "oplog.rs";
    public static final String OPLOG_NAMESPACE = "ns";
    public static final String OPLOG_OBJECT = "o";
    public static final String OPLOG_UPDATE = "o2";
    public static final String OPLOG_OPERATION = "op";
    public static final String OPLOG_UPDATE_OPERATION = "u";
    public static final String OPLOG_INSERT_OPERATION = "i";
    public static final String OPLOG_DELETE_OPERATION = "d";
    public static final String OPLOG_TIMESTAMP = "ts";
    public static final String GRIDFS_FILES_SUFFIX = ".files";
    public static final String GRIDFS_CHUNKS_SUFFIX = ".chunks";
    protected final Client client;
    protected final String riverIndexName;
    protected final List<ServerAddress> mongoServers;
    protected final String mongoDb;
    protected final String mongoCollection;
    protected final boolean mongoGridFS;
    protected final String mongoFilter;
    protected final String mongoAdminUser;
    protected final String mongoAdminPassword;
    protected final String mongoLocalUser;
    protected final String mongoLocalPassword;
    protected final String mongoOplogNamespace;
    protected final boolean mongoSecondaryReadPreference;
    protected final String indexName;
    protected final String typeName;
    protected final int bulkSize;
    protected final TimeValue bulkTimeout;
    protected final int throttleSize;
    private final ExecutableScript script;
    protected volatile List<Thread> tailerThreads;
    protected volatile Thread indexerThread;
    protected volatile boolean active;
    private final BlockingQueue<Map<String, Object>> stream;
    private Mongo mongo;
    private DB adminDb;

    /* loaded from: input_file:org/elasticsearch/river/mongodb/MongoDBRiver$Indexer.class */
    private class Indexer implements Runnable {
        private final ESLogger logger;
        private int deletedDocuments;
        private int insertedDocuments;
        private int updatedDocuments;
        private StopWatch sw;

        private Indexer() {
            this.logger = ESLoggerFactory.getLogger(getClass().getName());
            this.deletedDocuments = 0;
            this.insertedDocuments = 0;
            this.updatedDocuments = 0;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (MongoDBRiver.this.active) {
                this.sw = new StopWatch().start();
                this.deletedDocuments = 0;
                this.insertedDocuments = 0;
                this.updatedDocuments = 0;
                try {
                    BulkRequestBuilder prepareBulk = MongoDBRiver.this.client.prepareBulk();
                    BSONTimestamp updateBulkRequest = updateBulkRequest(prepareBulk, (Map) MongoDBRiver.this.stream.take());
                    do {
                        Map<String, Object> map = (Map) MongoDBRiver.this.stream.poll(MongoDBRiver.this.bulkTimeout.millis(), TimeUnit.MILLISECONDS);
                        if (map == null) {
                            break;
                        } else {
                            updateBulkRequest = updateBulkRequest(prepareBulk, map);
                        }
                    } while (prepareBulk.numberOfActions() < MongoDBRiver.this.bulkSize);
                    if (updateBulkRequest != null) {
                        MongoDBRiver.this.updateLastTimestamp(MongoDBRiver.this.mongoOplogNamespace, updateBulkRequest, prepareBulk);
                    }
                    try {
                        BulkResponse bulkResponse = (BulkResponse) prepareBulk.execute().actionGet();
                        if (bulkResponse.hasFailures()) {
                            this.logger.warn("failed to execute" + bulkResponse.buildFailureMessage(), new Object[0]);
                        }
                    } catch (ElasticSearchInterruptedException e) {
                        this.logger.warn("river-mongodb indexer bas been interrupted", e, new Object[0]);
                        Thread.currentThread().interrupt();
                    } catch (Exception e2) {
                        this.logger.warn("failed to execute bulk", e2, new Object[0]);
                    }
                } catch (InterruptedException e3) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("river-mongodb indexer interrupted", new Object[0]);
                    }
                    Thread.currentThread().interrupt();
                }
                logStatistics();
            }
        }

        private BSONTimestamp updateBulkRequest(BulkRequestBuilder bulkRequestBuilder, Map<String, Object> map) {
            if (map.get(MongoDBRiver.MONGODB_ID_FIELD) == null) {
                this.logger.warn("Cannot get object id. Skip the current item: [{}]", new Object[]{map});
                return null;
            }
            BSONTimestamp bSONTimestamp = (BSONTimestamp) map.get(MongoDBRiver.OPLOG_TIMESTAMP);
            String obj = map.get(MongoDBRiver.OPLOG_OPERATION).toString();
            String obj2 = map.get(MongoDBRiver.MONGODB_ID_FIELD).toString();
            map.remove(MongoDBRiver.OPLOG_TIMESTAMP);
            map.remove(MongoDBRiver.OPLOG_OPERATION);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("updateBulkRequest for id: [{}], operation: [{}]", new Object[]{obj2, obj});
            }
            Map<String, Object> map2 = null;
            try {
                map2 = XContentFactory.xContent(XContentType.JSON).createParser("{}").mapAndClose();
            } catch (IOException e) {
                this.logger.warn("failed to parse {}", e, new Object[0]);
            }
            if (MongoDBRiver.this.script != null && map2 != null) {
                map2.put("document", map);
                map2.put("operation", obj);
                map2.put("id", obj2);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Context before script executed: {}", new Object[]{map2});
                }
                MongoDBRiver.this.script.setNextVar("ctx", map2);
                try {
                    MongoDBRiver.this.script.run();
                    map2 = (Map) MongoDBRiver.this.script.unwrap(map2);
                } catch (Exception e2) {
                    this.logger.warn("failed to script process {}, ignoring", e2, new Object[]{map2});
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Context after script executed: {}", new Object[]{map2});
                }
                if (map2.containsKey("ignore") && map2.get("ignore").equals(Boolean.TRUE)) {
                    this.logger.debug("From script ignore document id: {}", new Object[]{obj2});
                    return bSONTimestamp;
                }
                if (map2.containsKey("deleted") && map2.get("deleted").equals(Boolean.TRUE)) {
                    map2.put("operation", MongoDBRiver.OPLOG_DELETE_OPERATION);
                }
                if (map2.containsKey("document")) {
                    map = (Map) map2.get("document");
                    this.logger.debug("From script document: {}", new Object[]{map});
                }
                if (map2.containsKey("operation")) {
                    obj = map2.get("operation").toString();
                    this.logger.debug("From script operation: {}", new Object[]{obj});
                }
            }
            try {
                String extractIndex = extractIndex(map2);
                String extractType = extractType(map2);
                String extractParent = extractParent(map2);
                String extractRouting = extractRouting(map2);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Operation: {} - index: {} - type: {} - routing: {} - parent: {}", new Object[]{obj, extractIndex, extractType, extractRouting, extractParent});
                }
                if (MongoDBRiver.OPLOG_INSERT_OPERATION.equals(obj)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Insert operation - id: {} - contains attachment: {}", new Object[]{obj, obj2, Boolean.valueOf(map.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT))});
                    }
                    bulkRequestBuilder.add(Requests.indexRequest(extractIndex).type(extractType).id(obj2).source(build(map, obj2)).routing(extractRouting).parent(extractParent));
                    this.insertedDocuments++;
                }
                if (MongoDBRiver.OPLOG_UPDATE_OPERATION.equals(obj)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Update operation - id: {} - contains attachment: {}", new Object[]{obj2, Boolean.valueOf(map.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT))});
                    }
                    bulkRequestBuilder.add(new DeleteRequest(extractIndex, extractType, obj2).routing(extractRouting).parent(extractParent));
                    bulkRequestBuilder.add(Requests.indexRequest(extractIndex).type(extractType).id(obj2).source(build(map, obj2)).routing(extractRouting).parent(extractParent));
                    this.updatedDocuments++;
                }
                if (MongoDBRiver.OPLOG_DELETE_OPERATION.equals(obj)) {
                    this.logger.info("Delete request [{}], [{}], [{}]", new Object[]{extractIndex, extractType, obj2});
                    bulkRequestBuilder.add(new DeleteRequest(extractIndex, extractType, obj2).routing(extractRouting).parent(extractParent));
                    this.deletedDocuments++;
                }
            } catch (IOException e3) {
                this.logger.warn("failed to parse {}", e3, new Object[]{map});
            }
            return bSONTimestamp;
        }

        private XContentBuilder build(Map<String, Object> map, String str) throws IOException {
            if (!map.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT)) {
                return XContentFactory.jsonBuilder().map(map);
            }
            this.logger.info("Add Attachment: {} to index {} / type {}", new Object[]{str, MongoDBRiver.this.indexName, MongoDBRiver.this.typeName});
            return GridFSHelper.serialize((GridFSDBFile) map.get(MongoDBRiver.MONGODB_ATTACHMENT));
        }

        private String extractParent(Map<String, Object> map) {
            return (String) map.get("_parent");
        }

        private String extractRouting(Map<String, Object> map) {
            return (String) map.get("_routing");
        }

        private String extractType(Map<String, Object> map) {
            String str = (String) map.get("_type");
            if (str == null) {
                str = MongoDBRiver.this.typeName;
            }
            return str;
        }

        private String extractIndex(Map<String, Object> map) {
            String str = (String) map.get("_index");
            if (str == null) {
                str = MongoDBRiver.this.indexName;
            }
            return str;
        }

        private void logStatistics() {
            long j = this.deletedDocuments + this.insertedDocuments;
            long seconds = this.sw.stop().totalTime().seconds();
            this.logger.info("Indexed {} documents, {} insertions {}, updates, {} deletions, {} documents per second", new Object[]{Long.valueOf(j), Integer.valueOf(this.insertedDocuments), Integer.valueOf(this.updatedDocuments), Integer.valueOf(this.deletedDocuments), Long.valueOf(seconds == 0 ? j : j / seconds)});
        }
    }

    /* loaded from: input_file:org/elasticsearch/river/mongodb/MongoDBRiver$Slurper.class */
    private class Slurper implements Runnable {
        private Mongo mongo;
        private DB slurpedDb;
        private DBCollection slurpedCollection;
        private DB oplogDb;
        private DBCollection oplogCollection;
        private final List<ServerAddress> mongoServers;

        public Slurper(List<ServerAddress> list) {
            this.mongoServers = list;
        }

        private boolean assignCollections() {
            DB db = this.mongo.getDB("admin");
            this.oplogDb = this.mongo.getDB("local");
            if (!MongoDBRiver.this.mongoAdminUser.isEmpty() && !MongoDBRiver.this.mongoAdminPassword.isEmpty()) {
                MongoDBRiver.this.logger.info("Authenticate {} with {}", new Object[]{"admin", MongoDBRiver.this.mongoAdminUser});
                CommandResult authenticateCommand = db.authenticateCommand(MongoDBRiver.this.mongoAdminUser, MongoDBRiver.this.mongoAdminPassword.toCharArray());
                if (!authenticateCommand.ok()) {
                    MongoDBRiver.this.logger.error("Autenticatication failed for {}: {}", new Object[]{"admin", authenticateCommand.getErrorMessage()});
                }
                this.oplogDb = db.getMongo().getDB("local");
            }
            if (!MongoDBRiver.this.mongoLocalUser.isEmpty() && !MongoDBRiver.this.mongoLocalPassword.isEmpty() && !this.oplogDb.isAuthenticated()) {
                MongoDBRiver.this.logger.info("Authenticate {} with {}", new Object[]{"local", MongoDBRiver.this.mongoLocalUser});
                CommandResult authenticateCommand2 = this.oplogDb.authenticateCommand(MongoDBRiver.this.mongoLocalUser, MongoDBRiver.this.mongoLocalPassword.toCharArray());
                if (!authenticateCommand2.ok()) {
                    MongoDBRiver.this.logger.error("Autenticatication failed for {}: {}", new Object[]{"local", authenticateCommand2.getErrorMessage()});
                    return false;
                }
            }
            if (!this.oplogDb.getCollectionNames().contains(MongoDBRiver.OPLOG_COLLECTION)) {
                MongoDBRiver.this.logger.error("Cannot find oplog.rs collection. Please use check this link: http://goo.gl/2x5IW", new Object[0]);
                return false;
            }
            this.oplogCollection = this.oplogDb.getCollection(MongoDBRiver.OPLOG_COLLECTION);
            this.slurpedDb = this.mongo.getDB(MongoDBRiver.this.mongoDb);
            if (!MongoDBRiver.this.mongoAdminUser.isEmpty() && !MongoDBRiver.this.mongoAdminUser.isEmpty() && db.isAuthenticated()) {
                this.slurpedDb = db.getMongo().getDB(MongoDBRiver.this.mongoDb);
            }
            this.slurpedCollection = this.slurpedDb.getCollection(MongoDBRiver.this.mongoCollection);
            return true;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.mongo = new MongoClient(this.mongoServers);
            if (MongoDBRiver.this.mongoSecondaryReadPreference) {
                this.mongo.setReadPreference(ReadPreference.secondaryPreferred());
            }
            while (MongoDBRiver.this.active) {
                try {
                } catch (NoSuchElementException e) {
                    MongoDBRiver.this.logger.warn("A mongoDB cursor bug ?", e, new Object[0]);
                } catch (MongoInterruptedException e2) {
                    MongoDBRiver.this.logger.error("Mongo driver has been interrupted", e2, new Object[0]);
                    MongoDBRiver.this.active = false;
                } catch (InterruptedException e3) {
                    if (MongoDBRiver.this.logger.isDebugEnabled()) {
                        MongoDBRiver.this.logger.debug("river-mongodb slurper interrupted", new Object[0]);
                    }
                    Thread.currentThread().interrupt();
                } catch (MongoException e4) {
                    MongoDBRiver.this.logger.error("Mongo gave an exception", e4, new Object[0]);
                }
                if (!assignCollections()) {
                    return;
                }
                DBCursor oplogCursor = oplogCursor(null);
                if (oplogCursor == null) {
                    oplogCursor = processFullCollection();
                }
                while (oplogCursor.hasNext()) {
                    processOplogEntry(oplogCursor.next());
                }
                Thread.sleep(500L);
            }
        }

        private DBCursor processFullCollection() throws InterruptedException {
            BSONTimestamp bSONTimestamp = (BSONTimestamp) this.oplogCollection.find().sort(new BasicDBObject(MongoDBRiver.OPLOG_TIMESTAMP, -1)).limit(1).next().get(MongoDBRiver.OPLOG_TIMESTAMP);
            addQueryToStream(MongoDBRiver.OPLOG_INSERT_OPERATION, bSONTimestamp, null);
            return oplogCursor(bSONTimestamp);
        }

        private void processOplogEntry(DBObject dBObject) throws InterruptedException {
            String obj = dBObject.get(MongoDBRiver.OPLOG_OPERATION).toString();
            String obj2 = dBObject.get(MongoDBRiver.OPLOG_NAMESPACE).toString();
            BSONTimestamp bSONTimestamp = (BSONTimestamp) dBObject.get(MongoDBRiver.OPLOG_TIMESTAMP);
            DBObject dBObject2 = (DBObject) dBObject.get(MongoDBRiver.OPLOG_OBJECT);
            if (dBObject.containsField("fromMigrate") && ((BasicBSONObject) dBObject).getBoolean("fromMigrate")) {
                MongoDBRiver.this.logger.debug("From migration or sharding operation. Can be ignored. {}", new Object[]{dBObject});
                return;
            }
            if (obj2.endsWith(MongoDBRiver.GRIDFS_CHUNKS_SUFFIX)) {
                return;
            }
            if (MongoDBRiver.this.logger.isTraceEnabled()) {
                MongoDBRiver.this.logger.trace("oplog entry - namespace [{}], operation [{}]", new Object[]{obj2, obj});
                MongoDBRiver.this.logger.trace("oplog processing item {}", new Object[]{dBObject});
            }
            if (MongoDBRiver.this.mongoGridFS && obj2.endsWith(MongoDBRiver.GRIDFS_FILES_SUFFIX) && (MongoDBRiver.OPLOG_INSERT_OPERATION.equals(obj) || MongoDBRiver.OPLOG_UPDATE_OPERATION.equals(obj))) {
                String obj3 = dBObject2.get(MongoDBRiver.MONGODB_ID_FIELD).toString();
                DBObject findOne = new GridFS(this.mongo.getDB(MongoDBRiver.this.mongoDb), MongoDBRiver.this.mongoCollection).findOne(new ObjectId(obj3));
                if (findOne != null) {
                    MongoDBRiver.this.logger.info("Caught file: {} - {}", new Object[]{findOne.getId(), findOne.getFilename()});
                    dBObject2 = findOne;
                } else {
                    MongoDBRiver.this.logger.warn("Cannot find file from id: {}", new Object[]{obj3});
                }
            }
            if (dBObject2 instanceof GridFSDBFile) {
                MongoDBRiver.this.logger.info("Add attachment: {}", new Object[]{dBObject2.get(MongoDBRiver.MONGODB_ID_FIELD)});
                HashMap hashMap = new HashMap();
                hashMap.put(MongoDBRiver.IS_MONGODB_ATTACHMENT, true);
                hashMap.put(MongoDBRiver.MONGODB_ATTACHMENT, dBObject2);
                hashMap.put(MongoDBRiver.MONGODB_ID_FIELD, dBObject2.get(MongoDBRiver.MONGODB_ID_FIELD));
                addToStream(obj, bSONTimestamp, hashMap);
                return;
            }
            if (!MongoDBRiver.OPLOG_UPDATE_OPERATION.equals(obj)) {
                addToStream(obj, bSONTimestamp, dBObject2.toMap());
                return;
            }
            DBObject dBObject3 = (DBObject) dBObject.get(MongoDBRiver.OPLOG_UPDATE);
            MongoDBRiver.this.logger.debug("Updated item: {}", new Object[]{dBObject3});
            addQueryToStream(obj, bSONTimestamp, dBObject3);
        }

        private DBObject getIndexFilter(BSONTimestamp bSONTimestamp) {
            BSONTimestamp lastTimestamp = bSONTimestamp == null ? MongoDBRiver.this.getLastTimestamp(MongoDBRiver.this.mongoOplogNamespace) : bSONTimestamp;
            new BasicDBObject();
            ArrayList arrayList = new ArrayList();
            if (MongoDBRiver.this.mongoGridFS) {
                arrayList.add(new BasicDBObject(MongoDBRiver.OPLOG_NAMESPACE, MongoDBRiver.this.mongoOplogNamespace + MongoDBRiver.GRIDFS_FILES_SUFFIX));
            } else {
                arrayList.add(new BasicDBObject(MongoDBRiver.OPLOG_NAMESPACE, MongoDBRiver.this.mongoOplogNamespace));
            }
            if (!MongoDBRiver.this.mongoFilter.isEmpty()) {
                arrayList.add(getMongoFilter());
            }
            if (lastTimestamp == null) {
                MongoDBRiver.this.logger.info("No known previous slurping time for this collection", new Object[0]);
            } else {
                arrayList.add(new BasicDBObject(MongoDBRiver.OPLOG_TIMESTAMP, new BasicDBObject("$gt", lastTimestamp)));
            }
            BasicDBObject basicDBObject = new BasicDBObject("$and", arrayList);
            if (MongoDBRiver.this.logger.isDebugEnabled()) {
                MongoDBRiver.this.logger.debug("Using filter: {}", new Object[]{basicDBObject});
            }
            return basicDBObject;
        }

        private DBObject getMongoFilter() {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            arrayList.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, MongoDBRiver.OPLOG_DELETE_OPERATION));
            arrayList3.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, MongoDBRiver.OPLOG_UPDATE_OPERATION));
            arrayList3.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, MongoDBRiver.OPLOG_INSERT_OPERATION));
            arrayList2.add(new BasicDBObject("$or", arrayList3));
            arrayList2.add((DBObject) JSON.parse(MongoDBRiver.this.mongoFilter));
            arrayList.add(new BasicDBObject("$and", arrayList2));
            return new BasicDBObject("$or", arrayList);
        }

        private DBCursor oplogCursor(BSONTimestamp bSONTimestamp) {
            DBObject indexFilter = getIndexFilter(bSONTimestamp);
            if (indexFilter == null) {
                return null;
            }
            return this.oplogCollection.find(indexFilter).sort(new BasicDBObject("$natural", 1)).addOption(2).addOption(32);
        }

        private void addQueryToStream(String str, BSONTimestamp bSONTimestamp, DBObject dBObject) throws InterruptedException {
            if (MongoDBRiver.this.logger.isDebugEnabled()) {
                MongoDBRiver.this.logger.debug("addQueryToStream - operation [{}], currentTimestamp [{}], update [{}]", new Object[]{str, bSONTimestamp, dBObject});
            }
            Iterator it = this.slurpedCollection.find(dBObject).iterator();
            while (it.hasNext()) {
                addToStream(str, bSONTimestamp, ((DBObject) it.next()).toMap());
            }
        }

        private void addToStream(String str, BSONTimestamp bSONTimestamp, Map<String, Object> map) throws InterruptedException {
            if (MongoDBRiver.this.logger.isDebugEnabled()) {
                MongoDBRiver.this.logger.debug("addToStream - operation [{}], currentTimestamp [{}], data [{}]", new Object[]{str, bSONTimestamp, map});
            }
            map.put(MongoDBRiver.OPLOG_TIMESTAMP, bSONTimestamp);
            map.put(MongoDBRiver.OPLOG_OPERATION, str);
            MongoDBRiver.this.stream.put(map);
        }
    }

    @Inject
    public MongoDBRiver(RiverName riverName, RiverSettings riverSettings, @RiverIndexName String str, Client client, ScriptService scriptService) {
        super(riverName, riverSettings);
        this.mongoServers = new ArrayList();
        this.tailerThreads = new ArrayList();
        this.active = true;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Prefix: [{}] - name: [{}]", new Object[]{this.logger.getPrefix(), this.logger.getName()});
            this.logger.debug("River settings: ", new Object[]{riverSettings.settings()});
        }
        this.riverIndexName = str;
        this.client = client;
        if (riverSettings.settings().containsKey("mongodb")) {
            Map map = (Map) riverSettings.settings().get("mongodb");
            if (map.containsKey(SERVERS_FIELD)) {
                Object obj = map.get(SERVERS_FIELD);
                this.logger.info("mongoServersSettings: " + obj, new Object[0]);
                if (XContentMapValues.isArray(obj)) {
                    Iterator it = ((ArrayList) obj).iterator();
                    while (it.hasNext()) {
                        Map map2 = (Map) it.next();
                        String nodeStringValue = XContentMapValues.nodeStringValue(map2.get(HOST_FIELD), (String) null);
                        int nodeIntegerValue = XContentMapValues.nodeIntegerValue(map2.get(PORT_FIELD), 0);
                        this.logger.info("Server: " + nodeStringValue + " - " + nodeIntegerValue, new Object[0]);
                        try {
                            this.mongoServers.add(new ServerAddress(nodeStringValue, nodeIntegerValue));
                        } catch (UnknownHostException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } else {
                try {
                    this.mongoServers.add(new ServerAddress(XContentMapValues.nodeStringValue(map.get(HOST_FIELD), DEFAULT_DB_HOST), XContentMapValues.nodeIntegerValue(map.get(PORT_FIELD), DEFAULT_DB_PORT)));
                } catch (UnknownHostException e2) {
                    e2.printStackTrace();
                }
            }
            if (map.containsKey(OPTIONS_FIELD)) {
                this.mongoSecondaryReadPreference = XContentMapValues.nodeBooleanValue(((Map) map.get(OPTIONS_FIELD)).get(SECONDARY_READ_PREFERENCE_FIELD), false);
            } else {
                this.mongoSecondaryReadPreference = false;
            }
            if (map.containsKey(CREDENTIALS_FIELD)) {
                String str2 = "";
                String str3 = "";
                String str4 = "";
                String str5 = "";
                Object obj2 = map.get(CREDENTIALS_FIELD);
                if (XContentMapValues.isArray(obj2)) {
                    Iterator it2 = ((ArrayList) obj2).iterator();
                    while (it2.hasNext()) {
                        Map map3 = (Map) it2.next();
                        String nodeStringValue2 = XContentMapValues.nodeStringValue(map3.get(DB_FIELD), (String) null);
                        if ("admin".equals(nodeStringValue2)) {
                            str2 = XContentMapValues.nodeStringValue(map3.get(USER_FIELD), (String) null);
                            str3 = XContentMapValues.nodeStringValue(map3.get(PASSWORD_FIELD), (String) null);
                        } else if ("local".equals(nodeStringValue2)) {
                            str4 = XContentMapValues.nodeStringValue(map3.get(USER_FIELD), (String) null);
                            str5 = XContentMapValues.nodeStringValue(map3.get(PASSWORD_FIELD), (String) null);
                        }
                    }
                }
                this.mongoAdminUser = str2;
                this.mongoAdminPassword = str3;
                this.mongoLocalUser = str4;
                this.mongoLocalPassword = str5;
            } else {
                this.mongoAdminUser = "";
                this.mongoAdminPassword = "";
                this.mongoLocalUser = "";
                this.mongoLocalPassword = "";
            }
            this.mongoDb = XContentMapValues.nodeStringValue(map.get(DB_FIELD), riverName.name());
            this.mongoCollection = XContentMapValues.nodeStringValue(map.get(COLLECTION_FIELD), riverName.name());
            this.mongoGridFS = XContentMapValues.nodeBooleanValue(map.get(GRIDFS_FIELD), false);
            if (map.containsKey(FILTER_FIELD)) {
                this.mongoFilter = XContentMapValues.nodeStringValue(map.get(FILTER_FIELD), "");
            } else {
                this.mongoFilter = "";
            }
            if (map.containsKey(SCRIPT_FIELD)) {
                this.script = scriptService.executable(map.containsKey(SCRIPT_TYPE_FIELD) ? map.get(SCRIPT_TYPE_FIELD).toString() : "js", map.get(SCRIPT_FIELD).toString(), Maps.newHashMap());
            } else {
                this.script = null;
            }
        } else {
            try {
                this.mongoServers.add(new ServerAddress(DEFAULT_DB_HOST, DEFAULT_DB_PORT));
            } catch (UnknownHostException e3) {
                e3.printStackTrace();
            }
            this.mongoSecondaryReadPreference = false;
            this.mongoDb = riverName.name();
            this.mongoCollection = riverName.name();
            this.mongoFilter = "";
            this.mongoGridFS = false;
            this.mongoAdminUser = "";
            this.mongoAdminPassword = "";
            this.mongoLocalUser = "";
            this.mongoLocalPassword = "";
            this.script = null;
        }
        this.mongoOplogNamespace = this.mongoDb + "." + this.mongoCollection;
        if (riverSettings.settings().containsKey(INDEX_OBJECT)) {
            Map map4 = (Map) riverSettings.settings().get(INDEX_OBJECT);
            this.indexName = XContentMapValues.nodeStringValue(map4.get(NAME_FIELD), this.mongoDb);
            this.typeName = XContentMapValues.nodeStringValue(map4.get(TYPE_FIELD), this.mongoDb);
            this.bulkSize = XContentMapValues.nodeIntegerValue(map4.get(BULK_SIZE_FIELD), 100);
            if (map4.containsKey(BULK_TIMEOUT_FIELD)) {
                this.bulkTimeout = TimeValue.parseTimeValue(XContentMapValues.nodeStringValue(map4.get(BULK_TIMEOUT_FIELD), "10ms"), TimeValue.timeValueMillis(10L));
            } else {
                this.bulkTimeout = TimeValue.timeValueMillis(10L);
            }
            this.throttleSize = XContentMapValues.nodeIntegerValue(map4.get(THROTTLE_SIZE_FIELD), this.bulkSize * 5);
        } else {
            this.indexName = this.mongoDb;
            this.typeName = this.mongoDb;
            this.bulkSize = 100;
            this.bulkTimeout = TimeValue.timeValueMillis(10L);
            this.throttleSize = this.bulkSize * 5;
        }
        if (this.throttleSize == -1) {
            this.stream = new LinkedTransferQueue();
        } else {
            this.stream = new ArrayBlockingQueue(this.throttleSize);
        }
    }

    public void start() {
        for (ServerAddress serverAddress : this.mongoServers) {
            this.logger.info("Using mongodb server(s): host [{}], port [{}]", new Object[]{serverAddress.getHost(), Integer.valueOf(serverAddress.getPort())});
        }
        this.logger.info("starting mongodb stream. options: secondaryreadpreference [{}], throttlesize [{}], gridfs [{}], filter [{}], db [{}], collection [{}], script [{}], indexing to [{}]/[{}]", new Object[]{Boolean.valueOf(this.mongoSecondaryReadPreference), Integer.valueOf(this.throttleSize), Boolean.valueOf(this.mongoGridFS), this.mongoFilter, this.mongoDb, this.mongoCollection, this.script, this.indexName, this.typeName});
        try {
            this.client.admin().indices().prepareCreate(this.indexName).execute().actionGet();
        } catch (Exception e) {
            if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) && !(ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException)) {
                this.logger.warn("failed to create index [{}], disabling river...", e, new Object[]{this.indexName});
                return;
            }
        }
        if (this.mongoGridFS) {
            try {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Set explicit attachment mapping.", new Object[0]);
                }
                this.client.admin().indices().preparePutMapping(new String[]{this.indexName}).setType(this.typeName).setSource(getGridFSMapping()).execute().actionGet();
            } catch (Exception e2) {
                this.logger.warn("Failed to set explicit mapping (attachment): {}", e2, new Object[0]);
            }
        }
        if (isMongos()) {
            DBCursor find = getConfigDb().getCollection("shards").find();
            while (find.hasNext()) {
                DBObject next = find.next();
                this.logger.info(next.toString(), new Object[0]);
                List<ServerAddress> serverAddressForReplica = getServerAddressForReplica(next);
                if (serverAddressForReplica != null) {
                    this.tailerThreads.add(EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "mongodb_river_slurper-" + next.get(MONGODB_ID_FIELD).toString()).newThread(new Slurper(serverAddressForReplica)));
                }
            }
        } else {
            this.tailerThreads.add(EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "mongodb_river_slurper").newThread(new Slurper(this.mongoServers)));
        }
        Iterator<Thread> it = this.tailerThreads.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        this.indexerThread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "mongodb_river_indexer").newThread(new Indexer());
        this.indexerThread.start();
    }

    private boolean isMongos() {
        DB adminDb = getAdminDb();
        if (adminDb == null) {
            return false;
        }
        CommandResult command = adminDb.command(new BasicDBObject("serverStatus", 1));
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("serverStatus: {}", new Object[]{command});
            this.logger.trace("process: {}", new Object[]{command.get("process")});
        }
        if (command != null && command.get("process") != null) {
            return command.get("process").equals("mongos");
        }
        this.logger.warn("serverStatus return null.", new Object[0]);
        return false;
    }

    private DB getAdminDb() {
        if (this.adminDb == null) {
            this.adminDb = getMongoClient().getDB("admin");
            if (!this.mongoAdminUser.isEmpty() && !this.mongoAdminPassword.isEmpty() && !this.adminDb.isAuthenticated()) {
                this.logger.info("Authenticate {} with {}", new Object[]{"admin", this.mongoAdminUser});
                try {
                    CommandResult authenticateCommand = this.adminDb.authenticateCommand(this.mongoAdminUser, this.mongoAdminPassword.toCharArray());
                    if (!authenticateCommand.ok()) {
                        this.logger.error("Autenticatication failed for {}: {}", new Object[]{"admin", authenticateCommand.getErrorMessage()});
                    }
                } catch (MongoException e) {
                    this.logger.warn("getAdminDb() failed", e, new Object[0]);
                }
            }
        }
        return this.adminDb;
    }

    private DB getConfigDb() {
        DB db = getMongoClient().getDB(DB_CONFIG);
        if (!this.mongoAdminUser.isEmpty() && !this.mongoAdminUser.isEmpty() && getAdminDb().isAuthenticated()) {
            db = getAdminDb().getMongo().getDB(DB_CONFIG);
        }
        return db;
    }

    private Mongo getMongoClient() {
        if (this.mongo == null) {
            this.mongo = new MongoClient(this.mongoServers);
        }
        return this.mongo;
    }

    private List<ServerAddress> getServerAddressForReplica(DBObject dBObject) {
        String obj = dBObject.get(HOST_FIELD).toString();
        if (obj.contains("/")) {
            obj = obj.substring(obj.indexOf("/") + 1);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("getServerAddressForReplica - definition: {}", new Object[]{obj});
        }
        ArrayList arrayList = new ArrayList();
        for (String str : obj.split(",")) {
            try {
                arrayList.add(new ServerAddress(str));
            } catch (UnknownHostException e) {
                this.logger.warn("failed to execute bulk", e, new Object[0]);
            }
        }
        return arrayList;
    }

    public void close() {
        if (this.active) {
            this.logger.info("closing mongodb stream river", new Object[0]);
            this.active = false;
            Iterator<Thread> it = this.tailerThreads.iterator();
            while (it.hasNext()) {
                it.next().interrupt();
            }
            this.indexerThread.interrupt();
        }
    }

    private XContentBuilder getGridFSMapping() throws IOException {
        XContentBuilder endObject = XContentFactory.jsonBuilder().startObject().startObject(this.typeName).startObject("properties").startObject("content").field(TYPE_FIELD, "attachment").endObject().startObject("filename").field(TYPE_FIELD, "string").endObject().startObject("contentType").field(TYPE_FIELD, "string").endObject().startObject("md5").field(TYPE_FIELD, "string").endObject().startObject("length").field(TYPE_FIELD, "long").endObject().startObject("chunkSize").field(TYPE_FIELD, "long").endObject().endObject().endObject().endObject();
        this.logger.info("Mapping: {}", new Object[]{endObject.string()});
        return endObject;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public BSONTimestamp getLastTimestamp(String str) {
        Map map;
        String obj;
        GetResponse getResponse = (GetResponse) this.client.prepareGet(this.riverIndexName, this.riverName.getName(), str).execute().actionGet();
        if (!getResponse.isExists() || (map = (Map) getResponse.getSourceAsMap().get("mongodb")) == null || (obj = map.get(LAST_TIMESTAMP_FIELD).toString()) == null) {
            return null;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("{} last timestamp: {}", new Object[]{str, obj});
        }
        return (BSONTimestamp) JSON.parse(obj);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateLastTimestamp(String str, BSONTimestamp bSONTimestamp, BulkRequestBuilder bulkRequestBuilder) {
        try {
            bulkRequestBuilder.add(Requests.indexRequest(this.riverIndexName).type(this.riverName.getName()).id(str).source(XContentFactory.jsonBuilder().startObject().startObject("mongodb").field(LAST_TIMESTAMP_FIELD, JSON.serialize(bSONTimestamp)).endObject().endObject()));
        } catch (IOException e) {
            this.logger.error("error updating last timestamp for namespace {}", new Object[]{str});
        }
    }
}
