package org.elasticsearch.river.mongodb;

import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoSocketException;
import com.mongodb.MongoTimeoutException;
import com.mongodb.ServerAddress;
import com.mongodb.gridfs.GridFSDBFile;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.river.mongodb.MongoConfig;
import org.elasticsearch.river.mongodb.util.MongoDBHelper;
import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper;
import org.elasticsearch.script.ScriptService;

/* loaded from: input_file:org/elasticsearch/river/mongodb/MongoDBRiver.class */
public class MongoDBRiver extends AbstractRiverComponent implements River {
    public static final String TYPE = "mongodb";
    public static final String NAME = "mongodb-river";
    public static final String STATUS_ID = "_riverstatus";
    public static final String STATUS_FIELD = "status";
    public static final String DESCRIPTION = "MongoDB River Plugin";
    public static final String LAST_TIMESTAMP_FIELD = "_last_ts";
    public static final String LAST_GTID_FIELD = "_last_gtid";
    public static final String MONGODB_LOCAL_DATABASE = "local";
    public static final String MONGODB_ADMIN_DATABASE = "admin";
    public static final String MONGODB_CONFIG_DATABASE = "config";
    public static final String MONGODB_ID_FIELD = "_id";
    public static final String MONGODB_OID_FIELD = "oid";
    public static final String MONGODB_SEQ_FIELD = "seq";
    public static final String MONGODB_IN_OPERATOR = "$in";
    public static final String MONGODB_OR_OPERATOR = "$or";
    public static final String MONGODB_AND_OPERATOR = "$and";
    public static final String MONGODB_NATURAL_OPERATOR = "$natural";
    public static final String OPLOG_COLLECTION = "oplog.rs";
    public static final String OPLOG_REFS_COLLECTION = "oplog.refs";
    public static final String OPLOG_NAMESPACE = "ns";
    public static final String OPLOG_NAMESPACE_COMMAND = "$cmd";
    public static final String OPLOG_ADMIN_COMMAND = "admin.$cmd";
    public static final String OPLOG_OBJECT = "o";
    public static final String OPLOG_UPDATE = "o2";
    public static final String OPLOG_OPERATION = "op";
    public static final String OPLOG_UPDATE_OPERATION = "u";
    public static final String OPLOG_UPDATE_ROW_OPERATION = "ur";
    public static final String OPLOG_INSERT_OPERATION = "i";
    public static final String OPLOG_DELETE_OPERATION = "d";
    public static final String OPLOG_COMMAND_OPERATION = "c";
    public static final String OPLOG_NOOP_OPERATION = "n";
    public static final String OPLOG_DROP_COMMAND_OPERATION = "drop";
    public static final String OPLOG_DROP_DATABASE_COMMAND_OPERATION = "dropDatabase";
    public static final String OPLOG_RENAME_COLLECTION_COMMAND_OPERATION = "renameCollection";
    public static final String OPLOG_TO = "to";
    public static final String OPLOG_TIMESTAMP = "ts";
    public static final String OPLOG_FROM_MIGRATE = "fromMigrate";
    public static final String OPLOG_OPS = "ops";
    public static final String OPLOG_CREATE_COMMAND = "create";
    public static final String OPLOG_REF = "ref";
    public static final String GRIDFS_FILES_SUFFIX = ".files";
    public static final String GRIDFS_CHUNKS_SUFFIX = ".chunks";
    public static final String INSERTION_ORDER_KEY = "$natural";
    static final int MONGODB_RETRY_ERROR_DELAY_MS = 10000;
    protected final MongoDBRiverDefinition definition;
    protected final Client esClient;
    protected final ScriptService scriptService;
    protected final SharedContext context;
    protected final List<Thread> tailerThreads;
    protected volatile Thread startupThread;
    protected volatile Thread indexerThread;
    protected volatile Thread statusThread;
    private final MongoClientService mongoClientService;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/elasticsearch/river/mongodb/MongoDBRiver$QueueEntry.class */
    public static class QueueEntry {
        private final DBObject data;
        private final Operation operation;
        private final Timestamp<?> oplogTimestamp;
        private final String collection;

        public QueueEntry(DBObject dBObject, String str) {
            this(null, Operation.INSERT, dBObject, str);
        }

        public QueueEntry(Timestamp<?> timestamp, Operation operation, DBObject dBObject, String str) {
            this.data = dBObject;
            this.operation = operation;
            this.oplogTimestamp = timestamp;
            this.collection = str;
        }

        public boolean isAttachment() {
            return this.data instanceof GridFSDBFile;
        }

        public DBObject getData() {
            return this.data;
        }

        public Operation getOperation() {
            return this.operation;
        }

        public Timestamp<?> getOplogTimestamp() {
            return this.oplogTimestamp;
        }

        public String getCollection() {
            return this.collection;
        }
    }

    @Inject
    public MongoDBRiver(RiverName riverName, RiverSettings riverSettings, @RiverIndexName String str, Client client, ScriptService scriptService, MongoClientService mongoClientService) {
        super(riverName, riverSettings);
        this.tailerThreads = Lists.newArrayList();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Initializing", new Object[0]);
        }
        this.esClient = client;
        this.scriptService = scriptService;
        this.mongoClientService = mongoClientService;
        this.definition = MongoDBRiverDefinition.parseSettings(riverName.name(), str, riverSettings, scriptService);
        this.context = new SharedContext(this.definition.getThrottleSize() == -1 ? new LinkedTransferQueue() : new ArrayBlockingQueue(this.definition.getThrottleSize()), Status.STOPPED);
    }

    public RiverSettings settings() {
        return ((AbstractRiverComponent) this).settings;
    }

    public void start() {
        this.logger.info("{} - {}", new Object[]{DESCRIPTION, MongoDBHelper.getRiverVersion()});
        Status riverStatus = MongoDBRiverHelper.getRiverStatus(this.esClient, this.riverName.getName());
        if (riverStatus == Status.IMPORT_FAILED || riverStatus == Status.INITIAL_IMPORT_FAILED || riverStatus == Status.SCRIPT_IMPORT_FAILED || riverStatus == Status.START_FAILED) {
            this.logger.error("Cannot start. Current status is {}", new Object[]{riverStatus});
            return;
        }
        if (riverStatus == Status.STOPPED) {
            this.context.setStatus(Status.STOPPED);
            this.logger.info("River is currently disabled and will not be started", new Object[0]);
        } else {
            this.context.setStatus(Status.START_PENDING);
            MongoDBRiverHelper.setRiverStatus(this.esClient, this.riverName.getName(), Status.RUNNING);
            this.logger.info("Startup pending", new Object[0]);
        }
        this.statusThread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "mongodb_river_status:" + this.definition.getIndexName()).newThread(new StatusChecker(this, this.definition, this.context));
        this.statusThread.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void internalStartRiver() {
        if (this.startupThread != null) {
            return;
        }
        this.context.setStatus(Status.STARTING);
        this.startupThread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "mongodb_river_startup:" + this.definition.getIndexName()).newThread(new Runnable() { // from class: org.elasticsearch.river.mongodb.MongoDBRiver.1
            @Override // java.lang.Runnable
            public void run() {
                MongoConfig call;
                MongoDBRiver.this.logger.info("Starting", new Object[0]);
                MongoDBRiver.this.logger.info("MongoDB options: secondaryreadpreference [{}], drop_collection [{}], include_collection [{}], throttlesize [{}], gridfs [{}], filter [{}], db [{}], collection [{}], script [{}], indexing to [{}]/[{}]", new Object[]{Boolean.valueOf(MongoDBRiver.this.definition.isMongoSecondaryReadPreference()), Boolean.valueOf(MongoDBRiver.this.definition.isDropCollection()), MongoDBRiver.this.definition.getIncludeCollection(), Integer.valueOf(MongoDBRiver.this.definition.getThrottleSize()), Boolean.valueOf(MongoDBRiver.this.definition.isMongoGridFS()), MongoDBRiver.this.definition.getMongoOplogFilter(), MongoDBRiver.this.definition.getMongoDb(), MongoDBRiver.this.definition.getMongoCollection(), MongoDBRiver.this.definition.getScript(), MongoDBRiver.this.definition.getIndexName(), MongoDBRiver.this.definition.getTypeName()});
                for (ServerAddress serverAddress : MongoDBRiver.this.definition.getMongoServers()) {
                    try {
                        MongoDBRiver.this.logger.debug("Using MongoDB server(s): host [{}], port [{}]", new Object[]{serverAddress.getHost(), Integer.valueOf(serverAddress.getPort())});
                    } catch (Throwable th) {
                        MongoDBRiver.this.logger.warn("Failed to start", th, new Object[0]);
                        MongoDBRiverHelper.setRiverStatus(MongoDBRiver.this.esClient, MongoDBRiver.this.definition.getRiverName(), Status.START_FAILED);
                        MongoDBRiver.this.context.setStatus(Status.START_FAILED);
                        return;
                    } finally {
                        MongoDBRiver.this.startupThread = null;
                    }
                }
                try {
                    if (!MongoDBRiver.this.esClient.admin().indices().prepareExists(new String[]{MongoDBRiver.this.definition.getIndexName()}).get().isExists()) {
                        MongoDBRiver.this.esClient.admin().indices().prepareCreate(MongoDBRiver.this.definition.getIndexName()).get();
                    }
                } catch (Exception e) {
                    if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) && !(ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException)) {
                        MongoDBRiver.this.logger.error("failed to create index [{}], disabling river...", e, new Object[]{MongoDBRiver.this.definition.getIndexName()});
                        return;
                    }
                }
                if (MongoDBRiver.this.definition.isMongoGridFS()) {
                    try {
                        if (MongoDBRiver.this.logger.isDebugEnabled()) {
                            MongoDBRiver.this.logger.debug("Set explicit attachment mapping.", new Object[0]);
                        }
                        MongoDBRiver.this.esClient.admin().indices().preparePutMapping(new String[]{MongoDBRiver.this.definition.getIndexName()}).setType(MongoDBRiver.this.definition.getTypeName()).setSource(MongoDBRiver.this.getGridFSMapping()).get();
                    } catch (Exception e2) {
                        MongoDBRiver.this.logger.warn("Failed to set explicit mapping (attachment): {}", e2, new Object[0]);
                    }
                }
                MongoClient mongoClusterClient = MongoDBRiver.this.mongoClientService.getMongoClusterClient(MongoDBRiver.this.definition);
                MongoConfigProvider mongoConfigProvider = new MongoConfigProvider(MongoDBRiver.this, MongoDBRiver.this.mongoClientService);
                while (true) {
                    try {
                        call = mongoConfigProvider.call();
                        break;
                    } catch (MongoSocketException | MongoTimeoutException unused) {
                        Thread.sleep(10000L);
                    }
                }
                Timestamp<?> timestamp = null;
                if (MongoDBRiver.this.definition.getInitialTimestamp() != null) {
                    MongoDBRiver.this.definition.getInitialTimestamp();
                } else if (MongoDBRiver.this.getLastProcessedTimestamp() != null) {
                    MongoDBRiver.this.getLastProcessedTimestamp();
                } else {
                    for (MongoConfig.Shard shard : call.getShards()) {
                        if (timestamp == null || shard.getLatestOplogTimestamp().compareTo(timestamp) < 1) {
                            timestamp = shard.getLatestOplogTimestamp();
                        }
                    }
                }
                MongoDBRiver.this.context.setStatus(Status.RUNNING);
                MongoDBRiver.this.indexerThread = EsExecutors.daemonThreadFactory(MongoDBRiver.this.settings.globalSettings(), "mongodb_river_indexer:" + MongoDBRiver.this.definition.getIndexName()).newThread(new Indexer(MongoDBRiver.this));
                MongoDBRiver.this.indexerThread.start();
                Timestamp<?> lastProcessedTimestamp = MongoDBRiver.this.getLastProcessedTimestamp();
                if (lastProcessedTimestamp != null) {
                    MongoDBRiver.this.logger.trace("Initial import already completed.", new Object[0]);
                } else if (MongoDBRiver.this.definition.isSkipInitialImport() || MongoDBRiver.this.definition.getInitialTimestamp() != null) {
                    MongoDBRiver.this.logger.info("Skip initial import from collection {}", new Object[]{MongoDBRiver.this.definition.getMongoCollection()});
                    lastProcessedTimestamp = MongoDBRiver.this.definition.getInitialTimestamp();
                } else {
                    Timestamp<?> timestamp2 = null;
                    for (MongoConfig.Shard shard2 : call.getShards()) {
                        if (timestamp2 == null || shard2.getLatestOplogTimestamp().compareTo(timestamp2) < 1) {
                            timestamp2 = shard2.getLatestOplogTimestamp();
                        }
                    }
                    new CollectionSlurper(MongoDBRiver.this, mongoClusterClient).importInitial(timestamp2);
                    lastProcessedTimestamp = null;
                }
                for (MongoConfig.Shard shard3 : call.getShards()) {
                    MongoDBRiver.this.tailerThreads.add(EsExecutors.daemonThreadFactory(MongoDBRiver.this.settings.globalSettings(), "mongodb_river_slurper_" + shard3.getName() + ":" + MongoDBRiver.this.definition.getIndexName()).newThread(new OplogSlurper(MongoDBRiver.this, lastProcessedTimestamp != null ? lastProcessedTimestamp : shard3.getLatestOplogTimestamp(), mongoClusterClient, MongoDBRiver.this.mongoClientService.getMongoShardClient(MongoDBRiver.this.definition, shard3.getReplicas()))));
                }
                Iterator<Thread> it = MongoDBRiver.this.tailerThreads.iterator();
                while (it.hasNext()) {
                    it.next().start();
                }
                MongoDBRiver.this.logger.info("Started", new Object[0]);
            }
        });
        this.startupThread.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void internalStopRiver() {
        this.logger.info("Stopping", new Object[0]);
        try {
            if (this.startupThread != null) {
                this.startupThread.interrupt();
                this.startupThread = null;
            }
            Iterator<Thread> it = this.tailerThreads.iterator();
            while (it.hasNext()) {
                it.next().interrupt();
            }
            this.tailerThreads.clear();
            if (this.indexerThread != null) {
                this.indexerThread.interrupt();
                this.indexerThread = null;
            }
            this.logger.info("Stopped", new Object[0]);
        } catch (Throwable th) {
            this.logger.error("Failed to stop", th, new Object[0]);
        } finally {
            this.context.setStatus(Status.STOPPED);
        }
    }

    public void close() {
        this.logger.info("Closing river", new Object[0]);
        if (this.statusThread != null) {
            this.statusThread.interrupt();
            this.statusThread = null;
        }
        internalStopRiver();
    }

    protected Timestamp<?> getLastProcessedTimestamp() {
        return getLastTimestamp(this.esClient, this.definition);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public XContentBuilder getGridFSMapping() throws IOException {
        XContentBuilder endObject = XContentFactory.jsonBuilder().startObject().startObject(this.definition.getTypeName()).startObject("properties").startObject("content").field(MongoDBRiverDefinition.TYPE_FIELD, "attachment").endObject().startObject("filename").field(MongoDBRiverDefinition.TYPE_FIELD, "string").endObject().startObject("contentType").field(MongoDBRiverDefinition.TYPE_FIELD, "string").endObject().startObject("md5").field(MongoDBRiverDefinition.TYPE_FIELD, "string").endObject().startObject("length").field(MongoDBRiverDefinition.TYPE_FIELD, "long").endObject().startObject("chunkSize").field(MongoDBRiverDefinition.TYPE_FIELD, "long").endObject().endObject().endObject().endObject();
        this.logger.info("GridFS Mapping: {}", new Object[]{endObject.string()});
        return endObject;
    }

    public static Timestamp<?> getLastTimestamp(Client client, MongoDBRiverDefinition mongoDBRiverDefinition) {
        Timestamp<?> on;
        client.admin().indices().prepareRefresh(new String[]{mongoDBRiverDefinition.getRiverIndexName()}).get();
        GetResponse getResponse = client.prepareGet(mongoDBRiverDefinition.getRiverIndexName(), mongoDBRiverDefinition.getRiverName(), mongoDBRiverDefinition.getMongoOplogNamespace()).get();
        if (!getResponse.isExists()) {
            if (mongoDBRiverDefinition.getInitialTimestamp() != null) {
                return mongoDBRiverDefinition.getInitialTimestamp();
            }
            return null;
        }
        Map map = (Map) getResponse.getSourceAsMap().get(TYPE);
        if (map == null || (on = Timestamp.on((Map<String, Object>) map)) == null) {
            return null;
        }
        return on;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setLastTimestamp(Timestamp<?> timestamp, BulkProcessor bulkProcessor) {
        try {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("setLastTimestamp [{}] [{}]", new Object[]{this.definition.getMongoOplogNamespace(), timestamp});
            }
            bulkProcessor.add(Requests.indexRequest(this.definition.getRiverIndexName()).type(this.definition.getRiverName()).id(this.definition.getMongoOplogNamespace()).source(source(timestamp)));
        } catch (IOException unused) {
            this.logger.error("error updating last timestamp for namespace {}", new Object[]{this.definition.getMongoOplogNamespace()});
        }
    }

    private static XContentBuilder source(Timestamp<?> timestamp) throws IOException {
        XContentBuilder startObject = XContentFactory.jsonBuilder().startObject().startObject(TYPE);
        timestamp.saveFields(startObject);
        return startObject.endObject().endObject();
    }

    public static long getIndexCount(Client client, MongoDBRiverDefinition mongoDBRiverDefinition) {
        if (!client.admin().indices().prepareExists(new String[]{mongoDBRiverDefinition.getIndexName()}).get().isExists()) {
            return 0L;
        }
        if (mongoDBRiverDefinition.isImportAllCollections()) {
            return ((CountResponse) client.prepareCount(new String[]{mongoDBRiverDefinition.getIndexName()}).execute().actionGet()).getCount();
        }
        if (client.admin().indices().prepareTypesExists(new String[]{mongoDBRiverDefinition.getIndexName()}).setTypes(new String[]{mongoDBRiverDefinition.getTypeName()}).get().isExists()) {
            return client.prepareCount(new String[]{mongoDBRiverDefinition.getIndexName()}).setTypes(new String[]{mongoDBRiverDefinition.getTypeName()}).get().getCount();
        }
        return 0L;
    }
}
