package fr.pilato.elasticsearch.river.fs.river;

import fr.pilato.elasticsearch.river.fs.util.FsRiverUtil;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import org.apache.tika.metadata.Metadata;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.format.ISODateTimeFormat;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;

/* loaded from: input_file:fr/pilato/elasticsearch/river/fs/river/FsRiver.class */
public class FsRiver extends AbstractRiverComponent implements River {
    private final Client client;
    private final String indexName;
    private final String typeName;
    private final int bulkSize;
    private final int maxConcurrentBulk;
    private final TimeValue bulkFlushInterval;
    private volatile BulkProcessor bulkProcessor;
    private volatile Thread feedThread;
    private volatile boolean closed;
    private final FsRiverFeedDefinition fsDefinition;

    /* loaded from: input_file:fr/pilato/elasticsearch/river/fs/river/FsRiver$FSParser.class */
    private class FSParser implements Runnable {
        private static final String field_filename = "file.filename";
        private FsRiverFeedDefinition fsdef;
        private ScanStatistic stats;

        public FSParser(FsRiverFeedDefinition fsRiverFeedDefinition) {
            this.fsdef = fsRiverFeedDefinition;
            if (FsRiver.this.logger.isInfoEnabled()) {
                FsRiver.this.logger.info("creating fs river [{}] for [{}] every [{}]", new Object[]{this.fsdef.getRivername(), this.fsdef.getUrl(), this.fsdef.getUpdateRate()});
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!FsRiver.this.closed) {
                try {
                    GetResponse getResponse = (GetResponse) FsRiver.this.client.prepareGet("_river", this.fsdef.getRivername(), "_fsstatus").execute().actionGet();
                    boolean z = true;
                    if (!getResponse.isExists()) {
                        FsRiver.this.client.prepareIndex("_river", this.fsdef.getRivername(), "_fsstatus").setSource(XContentFactory.jsonBuilder().startObject().startObject("fs").field("status", "STARTED").endObject().endObject()).execute().actionGet();
                    } else if (((String) XContentMapValues.extractValue("fs.status", getResponse.getSourceAsMap())).equals("STOPPED")) {
                        z = false;
                    }
                    if (z) {
                        this.stats = new ScanStatistic(this.fsdef.getUrl());
                        File file = new File(this.fsdef.getUrl());
                        if (!file.exists()) {
                            throw new RuntimeException(this.fsdef.getUrl() + " doesn't exists.");
                            break;
                        }
                        this.stats.setRootPathId(SignTool.sign(file.getAbsolutePath()));
                        Date date = new Date();
                        Date lastDateFromRiver = getLastDateFromRiver("_lastupdated");
                        if (lastDateFromRiver == null) {
                            indexRootDirectory(file);
                        }
                        addFilesRecursively(this.fsdef.getUrl(), lastDateFromRiver);
                        updateFsRiver("_lastupdated", date);
                    } else if (FsRiver.this.logger.isDebugEnabled()) {
                        FsRiver.this.logger.debug("FSRiver is disabled for {}", new Object[]{this.fsdef.getRivername()});
                    }
                } catch (Exception e) {
                    FsRiver.this.logger.warn("Error while indexing content from {}", new Object[]{this.fsdef.getUrl()});
                    if (FsRiver.this.logger.isDebugEnabled()) {
                        FsRiver.this.logger.debug("Exception for {} is {}", new Object[]{this.fsdef.getUrl(), e});
                    }
                }
                try {
                    if (FsRiver.this.logger.isDebugEnabled()) {
                        FsRiver.this.logger.debug("Fs river is going to sleep for {}", new Object[]{this.fsdef.getUpdateRate()});
                    }
                    Thread.sleep(this.fsdef.getUpdateRate().getMillis());
                } catch (InterruptedException e2) {
                }
            }
        }

        private Date getLastDateFromRiver(String str) {
            Object obj;
            Date date = null;
            try {
            } catch (Exception e) {
                FsRiver.this.logger.warn("failed to get _lastupdate, throttling....", e, new Object[0]);
            }
            if (FsRiver.this.closed) {
                return null;
            }
            FsRiver.this.client.admin().indices().prepareRefresh(new String[]{"_river"}).execute().actionGet();
            if (FsRiver.this.closed) {
                return null;
            }
            GetResponse getResponse = (GetResponse) FsRiver.this.client.prepareGet("_river", FsRiver.this.riverName().name(), str).execute().actionGet();
            if (getResponse.isExists()) {
                Map map = (Map) getResponse.getSourceAsMap().get("fs");
                if (map != null && (obj = map.get("lastdate")) != null) {
                    date = ISODateTimeFormat.dateOptionalTimeParser().parseDateTime(obj.toString()).toDate();
                }
            } else if (FsRiver.this.logger.isDebugEnabled()) {
                FsRiver.this.logger.debug("{} doesn't exist", new Object[]{str});
            }
            return date;
        }

        private void updateFsRiver(String str, Date date) throws Exception {
            esIndex("_river", FsRiver.this.riverName.name(), str, XContentFactory.jsonBuilder().startObject().startObject("fs").field("feedname", this.fsdef.getRivername()).field("lastdate", new DateTime(date).secondOfDay().roundFloorCopy().minusSeconds(2).toDate()).field("docadded", this.stats.getNbDocScan()).field("docdeleted", this.stats.getNbDocDeleted()).endObject().endObject());
        }

        private FileAbstractor buildFileAbstractor() throws Exception {
            if (PROTOCOL.LOCAL.equals(this.fsdef.getProtocol())) {
                return new FileAbstractorFile(this.fsdef);
            }
            if (PROTOCOL.SSH.equals(this.fsdef.getProtocol())) {
                return new FileAbstractorSSH(this.fsdef);
            }
            throw new RuntimeException(this.fsdef.getProtocol() + " is not supported yet. Please use " + PROTOCOL.LOCAL + " or " + PROTOCOL.SSH);
        }

        private void addFilesRecursively(String str, Date date) throws Exception {
            if (FsRiver.this.logger.isDebugEnabled()) {
                FsRiver.this.logger.debug("Indexing [{}] content", new Object[]{str});
            }
            FileAbstractor buildFileAbstractor = buildFileAbstractor();
            Collection<FileAbstractModel> files = buildFileAbstractor.getFiles(str);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            if (files != null) {
                for (FileAbstractModel fileAbstractModel : files) {
                    String str2 = fileAbstractModel.name;
                    if (!str2.contains("~")) {
                        if (fileAbstractModel.file) {
                            FsRiver.this.logger.debug("  - file: {}", new Object[]{str2});
                            if (FsRiverUtil.isIndexable(str2, this.fsdef.getIncludes(), this.fsdef.getExcludes())) {
                                arrayList.add(str2);
                                if (date == null || fileAbstractModel.lastModifiedDate > date.getTime() || (fileAbstractModel.creationDate > 0 && fileAbstractModel.creationDate > date.getTime())) {
                                    indexFile(this.stats, fileAbstractModel.name, str, buildFileAbstractor.getInputStream(fileAbstractModel), fileAbstractModel.lastModifiedDate);
                                    this.stats.addFile();
                                } else if (FsRiver.this.logger.isDebugEnabled()) {
                                    FsRiver.this.logger.debug("    - not modified: creation date {} , file date {}, last scan date {}", new Object[]{Long.valueOf(fileAbstractModel.creationDate), Long.valueOf(fileAbstractModel.lastModifiedDate), Long.valueOf(date.getTime())});
                                }
                            }
                        } else if (fileAbstractModel.directory) {
                            FsRiver.this.logger.debug("  - folder: {}", new Object[]{str2});
                            arrayList2.add(str2);
                            indexDirectory(this.stats, str2, fileAbstractModel.fullpath.concat(File.separator));
                            addFilesRecursively(fileAbstractModel.fullpath.concat(File.separator), date);
                        } else {
                            FsRiver.this.logger.debug("  - other: {}", new Object[]{str2});
                            if (FsRiver.this.logger.isDebugEnabled()) {
                                FsRiver.this.logger.debug("Not a file nor a dir. Skipping {}", new Object[]{fileAbstractModel.fullpath});
                            }
                        }
                    }
                }
            }
            if (this.fsdef.isRemoveDeleted()) {
                for (String str3 : getFileDirectory(str)) {
                    if (FsRiverUtil.isIndexable(str3, this.fsdef.getIncludes(), this.fsdef.getExcludes()) && !arrayList.contains(str3)) {
                        esDelete(FsRiver.this.indexName, FsRiver.this.typeName, SignTool.sign(new File(str, str3).getAbsolutePath()));
                        this.stats.removeFile();
                    }
                }
                for (String str4 : getFolderDirectory(str)) {
                    if (!arrayList2.contains(str4)) {
                        removeEsDirectoryRecursively(str, str4);
                    }
                }
            }
        }

        private Collection<String> getFileDirectory(String str) throws Exception {
            ArrayList arrayList = new ArrayList();
            if (FsRiver.this.closed) {
                return arrayList;
            }
            SearchResponse searchResponse = (SearchResponse) FsRiver.this.client.prepareSearch(new String[]{FsRiver.this.indexName}).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(new String[]{FsRiver.this.typeName}).setQuery(QueryBuilders.termQuery("encoded", SignTool.sign(str))).setFrom(0).setSize(50000).addField(field_filename).execute().actionGet();
            if (searchResponse.getHits() != null && searchResponse.getHits().getHits() != null) {
                for (SearchHit searchHit : searchResponse.getHits().getHits()) {
                    String str2 = null;
                    if (searchHit.getSource() != null && searchHit.getSource().get(FsRiverUtil.Doc.File.FILENAME) != null) {
                        str2 = searchHit.getSource().get(FsRiverUtil.Doc.File.FILENAME).toString();
                    } else if (searchHit.getFields() == null || searchHit.getFields().get(field_filename) == null) {
                        FsRiver.this.logger.warn("Can't find in _source nor fields the existing filenames in path [{}]. Please enable _source or store field [{}]", new Object[]{str, field_filename});
                    } else {
                        str2 = ((SearchHitField) searchHit.getFields().get(field_filename)).getValue().toString();
                    }
                    arrayList.add(str2);
                }
            }
            return arrayList;
        }

        private Collection<String> getFolderDirectory(String str) throws Exception {
            ArrayList arrayList = new ArrayList();
            if (FsRiver.this.closed) {
                return arrayList;
            }
            SearchResponse searchResponse = (SearchResponse) FsRiver.this.client.prepareSearch(new String[]{FsRiver.this.indexName}).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(new String[]{FsRiverUtil.INDEX_TYPE_FOLDER}).setQuery(QueryBuilders.termQuery("encoded", SignTool.sign(str))).setFrom(0).setSize(50000).execute().actionGet();
            if (searchResponse.getHits() != null && searchResponse.getHits().getHits() != null) {
                for (SearchHit searchHit : searchResponse.getHits().getHits()) {
                    arrayList.add(searchHit.getSource().get(FsRiverUtil.Doc.File.FILENAME).toString());
                }
            }
            return arrayList;
        }

        private void indexFile(ScanStatistic scanStatistic, String str, String str2, InputStream inputStream, long j) throws Exception {
            String str3;
            String sign;
            if (FsRiver.this.logger.isDebugEnabled()) {
                FsRiver.this.logger.debug("fetching content from [{}],[{}]", new Object[]{str2, str});
            }
            byte[] bArr = new byte[1024];
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            while (true) {
                int read = inputStream.read(bArr);
                if (-1 == read) {
                    break;
                } else {
                    byteArrayOutputStream.write(bArr, 0, read);
                }
            }
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            inputStream.close();
            byteArrayOutputStream.close();
            if (FsRiver.this.fsDefinition.isJsonSupport()) {
                if (FsRiver.this.fsDefinition.isFilenameAsId()) {
                    sign = str;
                    int lastIndexOf = sign.lastIndexOf(".");
                    if (lastIndexOf > 0) {
                        sign = sign.substring(0, lastIndexOf);
                    }
                } else {
                    sign = SignTool.sign(new File(str2, str).toString());
                }
                esIndex(FsRiver.this.indexName, FsRiver.this.typeName, sign, byteArray);
                return;
            }
            int i = 100000;
            if (FsRiver.this.fsDefinition.getIndexedChars() > 0.0d) {
                i = (int) Math.round(byteArray.length * FsRiver.this.fsDefinition.getIndexedChars());
            }
            Metadata metadata = new Metadata();
            try {
                str3 = TikaInstance.tika().parseToString(new BytesStreamInput(byteArray, false), metadata, i);
            } catch (Throwable th) {
                FsRiver.this.logger.debug("Failed to extract [" + i + "] characters of text for [" + str + "]", th, new Object[0]);
                str3 = "";
            }
            XContentBuilder startObject = XContentFactory.jsonBuilder().startObject();
            if (FsRiver.this.logger.isTraceEnabled()) {
                startObject.prettyPrint();
            }
            startObject.startObject(FsRiverUtil.Doc.FILE).field(FsRiverUtil.Doc.File.FILENAME, str).field(FsRiverUtil.Doc.File.LAST_MODIFIED, j).field(FsRiverUtil.Doc.File.INDEXING_DATE, new Date()).field(FsRiverUtil.Doc.File.CONTENT_TYPE, metadata.get("Content-Type")).field(FsRiverUtil.Doc.File.URL, "file://" + new File(str2, str).toString());
            if (FsRiver.this.fsDefinition.getIndexedChars() > 0.0d) {
                startObject.field(FsRiverUtil.Doc.File.INDEXED_CHARS, i);
            }
            if (FsRiver.this.fsDefinition.isAddFilesize()) {
                if (metadata.get("Content-Length") != null) {
                    startObject.field(FsRiverUtil.Doc.File.FILESIZE, metadata.get("Content-Length"));
                } else {
                    startObject.field(FsRiverUtil.Doc.File.FILESIZE, byteArray.length);
                }
            }
            startObject.endObject();
            startObject.startObject(FsRiverUtil.Doc.PATH).field("encoded", SignTool.sign(str2)).field("root", scanStatistic.getRootPathId()).field("virtual", FsRiverUtil.computeVirtualPathName(scanStatistic, str2)).field("real", new File(str2, str).toString()).endObject();
            startObject.startObject(FsRiverUtil.Doc.META).field(FsRiverUtil.Doc.Meta.AUTHOR, metadata.get("Author")).field(FsRiverUtil.Doc.Meta.TITLE, metadata.get(FsRiverUtil.Doc.Meta.TITLE)).field(FsRiverUtil.Doc.Meta.DATE, metadata.get(Metadata.DATE)).array(FsRiverUtil.Doc.Meta.KEYWORDS, Strings.commaDelimitedListToStringArray(metadata.get("Keywords"))).endObject();
            startObject.field(FsRiverUtil.Doc.CONTENT, str3);
            if (FsRiver.this.fsDefinition.isStoreSource()) {
                startObject.field(FsRiverUtil.Doc.ATTACHMENT, Base64.encodeBytes(byteArray));
            }
            startObject.endObject();
            esIndex(FsRiver.this.indexName, FsRiver.this.typeName, SignTool.sign(new File(str2, str).toString()), startObject);
        }

        private void indexDirectory(String str, String str2, String str3, String str4, String str5) throws Exception {
            esIndex(FsRiver.this.indexName, FsRiverUtil.INDEX_TYPE_FOLDER, str, XContentFactory.jsonBuilder().startObject().field(FsRiverUtil.Dir.NAME, str2).field("root", str3).field("virtual", str4).field("encoded", str5).endObject());
        }

        private void indexDirectory(ScanStatistic scanStatistic, String str, String str2) throws Exception {
            indexDirectory(SignTool.sign(str2), str, scanStatistic.getRootPathId(), FsRiverUtil.computeVirtualPathName(scanStatistic, str2.substring(0, str2.lastIndexOf(File.separator))), SignTool.sign(str2.substring(0, str2.lastIndexOf(File.separator))));
        }

        private void indexRootDirectory(File file) throws Exception {
            indexDirectory(SignTool.sign(file.getAbsolutePath()), file.getName(), this.stats.getRootPathId(), null, SignTool.sign(file.getParent()));
        }

        private void removeEsDirectoryRecursively(String str, String str2) throws Exception {
            String concat = str.concat(File.separator).concat(str2);
            FsRiver.this.logger.debug("Delete folder " + concat, new Object[0]);
            Iterator<String> it = getFileDirectory(concat).iterator();
            while (it.hasNext()) {
                esDelete(FsRiver.this.indexName, FsRiver.this.typeName, SignTool.sign(concat.concat(File.separator).concat(it.next())));
            }
            Iterator<String> it2 = getFolderDirectory(concat).iterator();
            while (it2.hasNext()) {
                removeEsDirectoryRecursively(concat, it2.next());
            }
            esDelete(FsRiver.this.indexName, FsRiverUtil.INDEX_TYPE_FOLDER, SignTool.sign(concat));
        }

        private void esIndex(String str, String str2, String str3, XContentBuilder xContentBuilder) throws Exception {
            if (FsRiver.this.logger.isDebugEnabled()) {
                FsRiver.this.logger.debug("Indexing in ES " + str + ", " + str2 + ", " + str3, new Object[0]);
            }
            if (FsRiver.this.logger.isTraceEnabled()) {
                FsRiver.this.logger.trace("JSon indexed : {}", new Object[]{xContentBuilder.string()});
            }
            if (FsRiver.this.closed) {
                FsRiver.this.logger.warn("trying to add new file while closing river. Document [{}]/[{}]/[{}] has been ignored", new Object[]{str, str2, str3});
            } else {
                FsRiver.this.bulkProcessor.add(new IndexRequest(str, str2, str3).source(xContentBuilder));
            }
        }

        private void esIndex(String str, String str2, String str3, byte[] bArr) throws Exception {
            if (FsRiver.this.logger.isDebugEnabled()) {
                FsRiver.this.logger.debug("Indexing in ES " + str + ", " + str2 + ", " + str3, new Object[0]);
            }
            if (FsRiver.this.logger.isTraceEnabled()) {
                FsRiver.this.logger.trace("JSon indexed : {}", new Object[]{bArr});
            }
            if (FsRiver.this.closed) {
                FsRiver.this.logger.warn("trying to add new file while closing river. Document [{}]/[{}]/[{}] has been ignored", new Object[]{str, str2, str3});
            } else {
                FsRiver.this.bulkProcessor.add(new IndexRequest(str, str2, str3).source(bArr));
            }
        }

        private void esDelete(String str, String str2, String str3) throws Exception {
            if (FsRiver.this.logger.isDebugEnabled()) {
                FsRiver.this.logger.debug("Deleting from ES " + str + ", " + str2 + ", " + str3, new Object[0]);
            }
            if (FsRiver.this.closed) {
                FsRiver.this.logger.warn("trying to remove a file while closing river. Document [{}]/[{}]/[{}] has been ignored", new Object[]{str, str2, str3});
            } else {
                FsRiver.this.bulkProcessor.add(new DeleteRequest(str, str2, str3));
            }
        }
    }

    /* loaded from: input_file:fr/pilato/elasticsearch/river/fs/river/FsRiver$PROTOCOL.class */
    public static final class PROTOCOL {
        public static final String LOCAL = "local";
        public static final String SSH = "ssh";
        public static final int SSH_PORT = 22;
    }

    @Inject
    public FsRiver(RiverName riverName, RiverSettings riverSettings, Client client) throws MalformedURLException {
        super(riverName, riverSettings);
        this.closed = false;
        this.client = client;
        if (riverSettings.settings().containsKey("fs")) {
            Map map = (Map) riverSettings.settings().get("fs");
            if (XContentMapValues.nodeStringValue(map.get(FsRiverUtil.Dir.NAME), (String) null) != null) {
                this.logger.warn("`fs.name` attribute is deprecated. Don't use it anymore.", new Object[0]);
            }
            String nodeStringValue = XContentMapValues.nodeStringValue(map.get(FsRiverUtil.Doc.File.URL), (String) null);
            if (nodeStringValue == null) {
                this.logger.warn("`url` is not set. Please define it. Falling back to default: /esdir.", new Object[0]);
                nodeStringValue = "/esdir";
            }
            this.fsDefinition = new FsRiverFeedDefinition(riverName.getName(), nodeStringValue, XContentMapValues.nodeTimeValue(map.get("update_rate"), TimeValue.timeValueMinutes(15L)), Arrays.asList(FsRiverUtil.buildArrayFromSettings(riverSettings.settings(), "fs.includes")), Arrays.asList(FsRiverUtil.buildArrayFromSettings(riverSettings.settings(), "fs.excludes")), XContentMapValues.nodeBooleanValue(map.get("json_support"), false), XContentMapValues.nodeBooleanValue(map.get("filename_as_id"), false), XContentMapValues.nodeBooleanValue(map.get("add_filesize"), true), XContentMapValues.nodeDoubleValue(map.get(FsRiverUtil.Doc.File.INDEXED_CHARS), 0.0d), XContentMapValues.nodeStringValue(map.get("username"), (String) null), XContentMapValues.nodeStringValue(map.get("password"), (String) null), XContentMapValues.nodeStringValue(map.get("server"), (String) null), XContentMapValues.nodeIntegerValue(map.get("port"), 22), XContentMapValues.nodeStringValue(map.get("protocol"), PROTOCOL.LOCAL), XContentMapValues.nodeStringValue(map.get("pem_path"), (String) null), XContentMapValues.nodeBooleanValue(map.get("remove_deleted"), true), XContentMapValues.nodeBooleanValue(map.get("store_source"), false));
        } else {
            this.logger.warn("You didn't define the fs url. Switching to defaults : [{}]", new Object[]{"/esdir"});
            this.fsDefinition = new FsRiverFeedDefinition(riverName.getName(), "/esdir", TimeValue.timeValueMinutes(15L), Arrays.asList("*.txt", "*.pdf"), Arrays.asList("*.exe"), false, false, true, 0.0d, null, null, null, 22, PROTOCOL.LOCAL, null, true, false);
        }
        if (riverSettings.settings().containsKey("index")) {
            Map map2 = (Map) riverSettings.settings().get("index");
            this.indexName = XContentMapValues.nodeStringValue(map2.get("index"), riverName.name());
            this.typeName = XContentMapValues.nodeStringValue(map2.get("type"), FsRiverUtil.INDEX_TYPE_DOC);
            this.bulkSize = XContentMapValues.nodeIntegerValue(map2.get("bulk_size"), 100);
            this.bulkFlushInterval = TimeValue.parseTimeValue(XContentMapValues.nodeStringValue(map2.get("flush_interval"), "5s"), TimeValue.timeValueSeconds(5L));
            this.maxConcurrentBulk = XContentMapValues.nodeIntegerValue(map2.get("max_concurrent_bulk"), 1);
        } else {
            this.indexName = riverName.name();
            this.typeName = FsRiverUtil.INDEX_TYPE_DOC;
            this.bulkSize = 100;
            this.maxConcurrentBulk = 1;
            this.bulkFlushInterval = TimeValue.timeValueSeconds(5L);
        }
        if (!PROTOCOL.LOCAL.equals(this.fsDefinition.getProtocol()) && !PROTOCOL.SSH.equals(this.fsDefinition.getProtocol())) {
            this.logger.error(this.fsDefinition.getProtocol() + " is not supported yet. Please use " + PROTOCOL.LOCAL + " or " + PROTOCOL.SSH + ". Disabling river", new Object[0]);
            this.closed = true;
        } else {
            if (!PROTOCOL.SSH.equals(this.fsDefinition.getProtocol()) || Strings.hasLength(this.fsDefinition.getUsername())) {
                return;
            }
            this.logger.error("When using SSH, you need to set a username and probably a password or a pem file. Disabling river", new Object[0]);
            this.closed = true;
        }
    }

    public void start() {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Starting fs river scanning", new Object[0]);
        }
        if (this.closed) {
            this.logger.info("Fs river is closed. Exiting", new Object[0]);
            return;
        }
        try {
            this.client.admin().indices().prepareCreate(this.indexName).execute().actionGet();
        } catch (Exception e) {
            if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) && !(ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException)) {
                this.logger.warn("failed to create index [{}], disabling river...", e, new Object[]{this.indexName});
                return;
            }
        }
        try {
            if (!this.fsDefinition.isJsonSupport()) {
                pushMapping(this.indexName, this.typeName, FsRiverUtil.buildFsFileMapping(this.typeName, true, this.fsDefinition.isStoreSource()));
            }
            this.bulkProcessor = BulkProcessor.builder(this.client, new BulkProcessor.Listener() { // from class: fr.pilato.elasticsearch.river.fs.river.FsRiver.1
                public void beforeBulk(long j, BulkRequest bulkRequest) {
                    FsRiver.this.logger.debug("Going to execute new bulk composed of {} actions", new Object[]{Integer.valueOf(bulkRequest.numberOfActions())});
                }

                public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                    FsRiver.this.logger.debug("Executed bulk composed of {} actions", new Object[]{Integer.valueOf(bulkRequest.numberOfActions())});
                    if (bulkResponse.hasFailures()) {
                        FsRiver.this.logger.warn("There was failures while executing bulk", new Object[]{bulkResponse.buildFailureMessage()});
                        if (FsRiver.this.logger.isDebugEnabled()) {
                            for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
                                if (bulkItemResponse.isFailed()) {
                                    FsRiver.this.logger.debug("Error for {}/{}/{} for {} operation: {}", new Object[]{bulkItemResponse.getIndex(), bulkItemResponse.getType(), bulkItemResponse.getId(), bulkItemResponse.getOpType(), bulkItemResponse.getFailureMessage()});
                                }
                            }
                        }
                    }
                }

                public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
                    FsRiver.this.logger.warn("Error executing bulk", th, new Object[0]);
                }
            }).setBulkActions(this.bulkSize).setConcurrentRequests(this.maxConcurrentBulk).setFlushInterval(this.bulkFlushInterval).build();
            this.feedThread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "fs_slurper").newThread(new FSParser(this.fsDefinition));
            this.feedThread.start();
        } catch (Exception e2) {
            this.logger.warn("failed to create mapping for [{}/{}], disabling river...", e2, new Object[]{this.indexName, this.typeName});
        }
    }

    public void close() {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Closing fs river", new Object[0]);
        }
        this.closed = true;
        if (this.feedThread != null) {
            this.feedThread.interrupt();
        }
        if (this.bulkProcessor != null) {
            this.bulkProcessor.close();
        }
    }

    private boolean isMappingExist(String str, String str2) {
        IndexMetaData index = ((ClusterStateResponse) this.client.admin().cluster().prepareState().setIndices(new String[]{str}).execute().actionGet()).getState().getMetaData().index(str);
        return (index == null || index.mapping(str2) == null) ? false : true;
    }

    private void pushMapping(String str, String str2, XContentBuilder xContentBuilder) throws Exception {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("pushMapping(" + str + "," + str2 + ")", new Object[0]);
        }
        if (!isMappingExist(str, str2)) {
            this.logger.debug("Mapping [" + str + "]/[" + str2 + "] doesn't exist. Creating it.", new Object[0]);
            if (xContentBuilder != null) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Mapping for [" + str + "]/[" + str2 + "]=" + xContentBuilder.string(), new Object[0]);
                }
                if (!((PutMappingResponse) this.client.admin().indices().preparePutMapping(new String[]{str}).setType(str2).setSource(xContentBuilder).execute().actionGet()).isAcknowledged()) {
                    throw new Exception("Could not define mapping for type [" + str + "]/[" + str2 + "].");
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Mapping definition for [" + str + "]/[" + str2 + "] succesfully created.", new Object[0]);
                }
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug("No mapping definition for [" + str + "]/[" + str2 + "]. Ignoring.", new Object[0]);
            }
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("Mapping [" + str + "]/[" + str2 + "] already exists.", new Object[0]);
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("/pushMapping(" + str + "," + str2 + ")", new Object[0]);
        }
    }
}
