package org.elasticsearch.river.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.ServerAddress;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.util.JSON;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import org.bson.types.BSONTimestamp;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.river.mongodb.util.MongoDBHelper;
import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper;
import org.elasticsearch.script.ScriptService;

/* loaded from: input_file:org/elasticsearch/river/mongodb/MongoDBRiver.class */
public class MongoDBRiver extends AbstractRiverComponent implements River {
    public static final String TYPE = "mongodb";
    public static final String NAME = "mongodb-river";
    public static final String STATUS_ID = "_riverstatus";
    public static final String STATUS_FIELD = "status";
    public static final String DESCRIPTION = "MongoDB River Plugin";
    public static final String LAST_TIMESTAMP_FIELD = "_last_ts";
    public static final String MONGODB_LOCAL_DATABASE = "local";
    public static final String MONGODB_ADMIN_DATABASE = "admin";
    public static final String MONGODB_CONFIG_DATABASE = "config";
    public static final String MONGODB_ID_FIELD = "_id";
    public static final String MONGODB_OR_OPERATOR = "$or";
    public static final String MONGODB_AND_OPERATOR = "$and";
    public static final String MONGODB_NATURAL_OPERATOR = "$natural";
    public static final String OPLOG_COLLECTION = "oplog.rs";
    public static final String OPLOG_NAMESPACE = "ns";
    public static final String OPLOG_NAMESPACE_COMMAND = "$cmd";
    public static final String OPLOG_OBJECT = "o";
    public static final String OPLOG_UPDATE = "o2";
    public static final String OPLOG_OPERATION = "op";
    public static final String OPLOG_UPDATE_OPERATION = "u";
    public static final String OPLOG_INSERT_OPERATION = "i";
    public static final String OPLOG_DELETE_OPERATION = "d";
    public static final String OPLOG_COMMAND_OPERATION = "c";
    public static final String OPLOG_DROP_COMMAND_OPERATION = "drop";
    public static final String OPLOG_TIMESTAMP = "ts";
    public static final String OPLOG_FROM_MIGRATE = "fromMigrate";
    public static final String GRIDFS_FILES_SUFFIX = ".files";
    public static final String GRIDFS_CHUNKS_SUFFIX = ".chunks";
    static final ESLogger logger = ESLoggerFactory.getLogger(MongoDBRiver.class.getName());
    protected final MongoDBRiverDefinition definition;
    protected final Client client;
    protected final ScriptService scriptService;
    protected final SharedContext context;
    protected volatile List<Thread> tailerThreads;
    protected volatile Thread indexerThread;
    protected volatile Thread statusThread;
    protected volatile boolean startInvoked;
    private Mongo mongo;
    private DB adminDb;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/elasticsearch/river/mongodb/MongoDBRiver$QueueEntry.class */
    public static class QueueEntry {
        private final DBObject data;
        private final String operation;
        private final BSONTimestamp oplogTimestamp;

        public QueueEntry(DBObject dBObject) {
            this(null, MongoDBRiver.OPLOG_INSERT_OPERATION, dBObject);
        }

        public QueueEntry(BSONTimestamp bSONTimestamp, String str, DBObject dBObject) {
            this.data = dBObject;
            this.operation = str;
            this.oplogTimestamp = bSONTimestamp;
        }

        public boolean isOplogEntry() {
            return this.oplogTimestamp != null;
        }

        public boolean isAttachment() {
            return this.data instanceof GridFSDBFile;
        }

        public DBObject getData() {
            return this.data;
        }

        public String getOperation() {
            return this.operation;
        }

        public BSONTimestamp getOplogTimestamp() {
            return this.oplogTimestamp;
        }
    }

    @Inject
    public MongoDBRiver(RiverName riverName, RiverSettings riverSettings, @RiverIndexName String str, Client client, ScriptService scriptService, MongoDBRiverDefinition mongoDBRiverDefinition) {
        super(riverName, riverSettings);
        this.tailerThreads = new ArrayList();
        this.startInvoked = false;
        if (logger.isDebugEnabled()) {
            logger.debug("Prefix: [{}] - name: [{}]", new Object[]{logger.getPrefix(), logger.getName()});
        }
        this.scriptService = scriptService;
        this.client = client;
        this.definition = mongoDBRiverDefinition;
        this.context = new SharedContext(mongoDBRiverDefinition.getThrottleSize() == -1 ? new LinkedTransferQueue() : new ArrayBlockingQueue(mongoDBRiverDefinition.getThrottleSize()), Status.STOPPED);
        this.statusThread = EsExecutors.daemonThreadFactory(riverSettings.globalSettings(), "mongodb_river_status").newThread(new StatusChecker(this, mongoDBRiverDefinition, this.context));
        this.statusThread.start();
    }

    public void start() {
        if (MongoDBRiverHelper.getRiverStatus(this.client, this.riverName.getName()) == Status.STOPPED) {
            logger.debug("Cannot start river {}. It is currently disabled", new Object[]{this.riverName.getName()});
            this.startInvoked = true;
            return;
        }
        this.context.setStatus(Status.RUNNING);
        for (ServerAddress serverAddress : this.definition.getMongoServers()) {
            logger.info("Using mongodb server(s): host [{}], port [{}]", new Object[]{serverAddress.getHost(), Integer.valueOf(serverAddress.getPort())});
        }
        logger.info("{} version: [{}]", new Object[]{DESCRIPTION, MongoDBHelper.getRiverVersion()});
        logger.info("starting mongodb stream. options: secondaryreadpreference [{}], drop_collection [{}], include_collection [{}], throttlesize [{}], gridfs [{}], filter [{}], db [{}], collection [{}], script [{}], indexing to [{}]/[{}]", new Object[]{Boolean.valueOf(this.definition.isMongoSecondaryReadPreference()), Boolean.valueOf(this.definition.isDropCollection()), this.definition.getIncludeCollection(), Integer.valueOf(this.definition.getThrottleSize()), Boolean.valueOf(this.definition.isMongoGridFS()), this.definition.getMongoFilter(), this.definition.getMongoDb(), this.definition.getMongoCollection(), this.definition.getScript(), this.definition.getIndexName(), this.definition.getTypeName()});
        try {
            this.client.admin().indices().prepareCreate(this.definition.getIndexName()).execute().actionGet();
        } catch (Exception e) {
            if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) && !(ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException)) {
                logger.warn("failed to create index [{}], disabling river...", e, new Object[]{this.definition.getIndexName()});
                return;
            }
        }
        if (this.definition.isMongoGridFS()) {
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("Set explicit attachment mapping.", new Object[0]);
                }
                this.client.admin().indices().preparePutMapping(new String[]{this.definition.getIndexName()}).setType(this.definition.getTypeName()).setSource(getGridFSMapping()).execute().actionGet();
            } catch (Exception e2) {
                logger.warn("Failed to set explicit mapping (attachment): {}", e2, new Object[0]);
            }
        }
        if (isMongos()) {
            DBCursor find = getConfigDb().getCollection("shards").find();
            while (find.hasNext()) {
                try {
                    DBObject next = find.next();
                    logger.debug("shards: {}", new Object[]{next.toString()});
                    List<ServerAddress> serverAddressForReplica = getServerAddressForReplica(next);
                    if (serverAddressForReplica != null) {
                        this.tailerThreads.add(EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "mongodb_river_slurper-" + next.get(MONGODB_ID_FIELD).toString()).newThread(new Slurper(serverAddressForReplica, this.definition, this.context, this.client)));
                    }
                } finally {
                    find.close();
                }
            }
        } else {
            this.tailerThreads.add(EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "mongodb_river_slurper").newThread(new Slurper(this.definition.getMongoServers(), this.definition, this.context, this.client)));
        }
        Iterator<Thread> it = this.tailerThreads.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        this.indexerThread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "mongodb_river_indexer").newThread(new Indexer(this.definition, this.context, this.client, this.scriptService));
        this.indexerThread.start();
        this.startInvoked = true;
    }

    private boolean isMongos() {
        DB adminDb = getAdminDb();
        if (adminDb == null) {
            return false;
        }
        CommandResult command = adminDb.command(new BasicDBObject("serverStatus", 1));
        if (command == null || command.get("process") == null) {
            logger.warn("serverStatus return null.", new Object[0]);
            return false;
        }
        String lowerCase = command.get("process").toString().toLowerCase();
        if (logger.isTraceEnabled()) {
            logger.trace("serverStatus: {}", new Object[]{command});
            logger.trace("process: {}", new Object[]{lowerCase});
        }
        return lowerCase.contains("mongos");
    }

    private DB getAdminDb() {
        if (this.adminDb == null) {
            this.adminDb = getMongoClient().getDB("admin");
            if (!this.definition.getMongoAdminUser().isEmpty() && !this.definition.getMongoAdminPassword().isEmpty() && !this.adminDb.isAuthenticated()) {
                logger.info("Authenticate {} with {}", new Object[]{"admin", this.definition.getMongoAdminUser()});
                try {
                    CommandResult authenticateCommand = this.adminDb.authenticateCommand(this.definition.getMongoAdminUser(), this.definition.getMongoAdminPassword().toCharArray());
                    if (!authenticateCommand.ok()) {
                        logger.error("Autenticatication failed for {}: {}", new Object[]{"admin", authenticateCommand.getErrorMessage()});
                    }
                } catch (MongoException e) {
                    logger.warn("getAdminDb() failed", e, new Object[0]);
                }
            }
        }
        return this.adminDb;
    }

    private DB getConfigDb() {
        DB db = getMongoClient().getDB(MONGODB_CONFIG_DATABASE);
        if (!this.definition.getMongoAdminUser().isEmpty() && !this.definition.getMongoAdminPassword().isEmpty() && getAdminDb().isAuthenticated()) {
            db = getAdminDb().getMongo().getDB(MONGODB_CONFIG_DATABASE);
        }
        return db;
    }

    private Mongo getMongoClient() {
        if (this.mongo == null) {
            this.mongo = new MongoClient(this.definition.getMongoServers(), this.definition.getMongoClientOptions());
        }
        return this.mongo;
    }

    private void closeMongoClient() {
        if (this.adminDb != null) {
            this.adminDb = null;
        }
        if (this.mongo != null) {
            this.mongo.close();
            this.mongo = null;
        }
    }

    private List<ServerAddress> getServerAddressForReplica(DBObject dBObject) {
        String obj = dBObject.get(MongoDBRiverDefinition.HOST_FIELD).toString();
        if (obj.contains("/")) {
            obj = obj.substring(obj.indexOf("/") + 1);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("getServerAddressForReplica - definition: {}", new Object[]{obj});
        }
        ArrayList arrayList = new ArrayList();
        for (String str : obj.split(",")) {
            try {
                arrayList.add(new ServerAddress(str));
            } catch (UnknownHostException e) {
                logger.warn("failed to execute bulk", e, new Object[0]);
            }
        }
        return arrayList;
    }

    public void close() {
        logger.info("closing mongodb stream river", new Object[0]);
        try {
            Iterator<Thread> it = this.tailerThreads.iterator();
            while (it.hasNext()) {
                it.next().interrupt();
            }
            this.tailerThreads.clear();
            if (this.indexerThread != null) {
                this.indexerThread.interrupt();
                this.indexerThread = null;
            }
            closeMongoClient();
        } catch (Throwable th) {
            logger.error("Fail to close river {}", th, new Object[]{this.riverName.getName()});
        } finally {
            this.context.setStatus(Status.STOPPED);
        }
    }

    private XContentBuilder getGridFSMapping() throws IOException {
        XContentBuilder endObject = XContentFactory.jsonBuilder().startObject().startObject(this.definition.getTypeName()).startObject("properties").startObject("content").field(MongoDBRiverDefinition.TYPE_FIELD, "attachment").endObject().startObject("filename").field(MongoDBRiverDefinition.TYPE_FIELD, "string").endObject().startObject("contentType").field(MongoDBRiverDefinition.TYPE_FIELD, "string").endObject().startObject("md5").field(MongoDBRiverDefinition.TYPE_FIELD, "string").endObject().startObject("length").field(MongoDBRiverDefinition.TYPE_FIELD, "long").endObject().startObject("chunkSize").field(MongoDBRiverDefinition.TYPE_FIELD, "long").endObject().endObject().endObject().endObject();
        logger.info("Mapping: {}", new Object[]{endObject.string()});
        return endObject;
    }

    public static BSONTimestamp getLastTimestamp(Client client, MongoDBRiverDefinition mongoDBRiverDefinition) {
        String obj;
        GetResponse getResponse = (GetResponse) client.prepareGet(mongoDBRiverDefinition.getRiverIndexName(), mongoDBRiverDefinition.getRiverName(), mongoDBRiverDefinition.getMongoOplogNamespace()).execute().actionGet();
        if (!getResponse.isExists()) {
            if (mongoDBRiverDefinition.getInitialTimestamp() != null) {
                return mongoDBRiverDefinition.getInitialTimestamp();
            }
            return null;
        }
        Map map = (Map) getResponse.getSourceAsMap().get(TYPE);
        if (map == null || (obj = map.get(LAST_TIMESTAMP_FIELD).toString()) == null) {
            return null;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("{} last timestamp: {}", new Object[]{mongoDBRiverDefinition.getMongoOplogNamespace(), obj});
        }
        return (BSONTimestamp) JSON.parse(obj);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void updateLastTimestamp(MongoDBRiverDefinition mongoDBRiverDefinition, BSONTimestamp bSONTimestamp, BulkRequestBuilder bulkRequestBuilder) {
        try {
            bulkRequestBuilder.add(Requests.indexRequest(mongoDBRiverDefinition.getRiverIndexName()).type(mongoDBRiverDefinition.getRiverName()).id(mongoDBRiverDefinition.getMongoOplogNamespace()).source(XContentFactory.jsonBuilder().startObject().startObject(TYPE).field(LAST_TIMESTAMP_FIELD, JSON.serialize(bSONTimestamp)).endObject().endObject()));
        } catch (IOException unused) {
            logger.error("error updating last timestamp for namespace {}", new Object[]{mongoDBRiverDefinition.getMongoOplogNamespace()});
        }
    }

    public static long getIndexCount(Client client, MongoDBRiverDefinition mongoDBRiverDefinition) {
        return ((CountResponse) client.prepareCount(new String[]{mongoDBRiverDefinition.getIndexName()}).execute().actionGet()).getCount();
    }
}
