/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.river.wikipedia;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.river.wikipedia.support.PageCallbackHandler;
import org.elasticsearch.river.wikipedia.support.WikiPage;
import org.elasticsearch.river.wikipedia.support.WikiXMLParser;
import org.elasticsearch.river.wikipedia.support.WikiXMLParserFactory;

public class WikipediaRiver
extends AbstractRiverComponent
implements River {
    private StringBuilder sb = new StringBuilder();
    private final Client client;
    private final URL url;
    private final String indexName;
    private final String typeName;
    private final int bulkSize;
    private final int dropThreshold;
    private final AtomicInteger onGoingBulks = new AtomicInteger();
    private volatile Thread thread;
    private volatile boolean closed = false;
    private volatile BulkRequestBuilder currentRequest;

    @Inject
    public WikipediaRiver(RiverName riverName, RiverSettings settings, Client client) throws MalformedURLException {
        super(riverName, settings);
        this.client = client;
        String url = "http://download.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2";
        if (settings.settings().containsKey("wikipedia")) {
            Map wikipediaSettings = (Map)settings.settings().get("wikipedia");
            url = XContentMapValues.nodeStringValue(wikipediaSettings.get("url"), (String)url);
        }
        this.logger.info("creating wikipedia stream river for [{}]", new Object[]{url});
        this.url = new URL(url);
        if (settings.settings().containsKey("index")) {
            Map indexSettings = (Map)settings.settings().get("index");
            this.indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), (String)riverName.name());
            this.typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), (String)"status");
            this.bulkSize = XContentMapValues.nodeIntegerValue(settings.settings().get("bulk_size"), (int)100);
            this.dropThreshold = XContentMapValues.nodeIntegerValue(settings.settings().get("drop_threshold"), (int)10);
        } else {
            this.indexName = riverName.name();
            this.typeName = "page";
            this.bulkSize = 100;
            this.dropThreshold = 10;
        }
    }

    public void start() {
        block4: {
            this.logger.info("starting twitter stream", new Object[0]);
            try {
                this.client.admin().indices().prepareCreate(this.indexName).execute().actionGet();
            }
            catch (Exception e) {
                if (ExceptionsHelper.unwrapCause((Throwable)e) instanceof IndexAlreadyExistsException || ExceptionsHelper.unwrapCause((Throwable)e) instanceof ClusterBlockException) break block4;
                this.logger.warn("failed to create index [{}], disabling river...", (Throwable)e, new Object[]{this.indexName});
                return;
            }
        }
        this.currentRequest = this.client.prepareBulk();
        WikiXMLParser parser = WikiXMLParserFactory.getSAXParser(this.url);
        try {
            parser.setPageCallback(new PageCallback());
        }
        catch (Exception e) {
            this.logger.error("failed to create parser", (Throwable)e, new Object[0]);
            return;
        }
        this.thread = EsExecutors.daemonThreadFactory((Settings)this.settings.globalSettings(), (String)"wikipedia_slurper").newThread(new Parser(parser));
        this.thread.start();
    }

    public void close() {
        this.logger.info("closing wikipedia river", new Object[0]);
        this.closed = true;
        if (this.thread != null) {
            this.thread.interrupt();
        }
    }

    private String stripTitle(String title) {
        this.sb.setLength(0);
        this.sb.append(title);
        while (this.sb.length() > 0 && (this.sb.charAt(this.sb.length() - 1) == '\n' || this.sb.charAt(this.sb.length() - 1) == ' ')) {
            this.sb.deleteCharAt(this.sb.length() - 1);
        }
        return this.sb.toString();
    }

    private class PageCallback
    implements PageCallbackHandler {
        private PageCallback() {
        }

        @Override
        public void process(WikiPage page) {
            if (WikipediaRiver.this.closed) {
                return;
            }
            String title = WikipediaRiver.this.stripTitle(page.getTitle());
            if (WikipediaRiver.this.logger.isTraceEnabled()) {
                WikipediaRiver.this.logger.trace("page {} : {}", new Object[]{page.getID(), page.getTitle()});
            }
            try {
                XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
                builder.field("title", title);
                builder.field("text", page.getText());
                builder.field("redirect", page.isRedirect());
                builder.field("special", page.isSpecialPage());
                builder.field("stub", page.isStub());
                builder.field("disambiguation", page.isDisambiguationPage());
                builder.startArray("category");
                for (String s : page.getCategories()) {
                    builder.value(s);
                }
                builder.endArray();
                builder.startArray("link");
                for (String s : page.getLinks()) {
                    builder.value(s);
                }
                builder.endArray();
                builder.endObject();
                WikipediaRiver.this.currentRequest.add(Requests.indexRequest((String)WikipediaRiver.this.indexName).type(WikipediaRiver.this.typeName).id(page.getID()).create(false).source(builder));
                this.processBulkIfNeeded();
            }
            catch (Exception e) {
                WikipediaRiver.this.logger.warn("failed to construct index request", (Throwable)e, new Object[0]);
            }
        }

        private void processBulkIfNeeded() {
            if (WikipediaRiver.this.currentRequest.numberOfActions() >= WikipediaRiver.this.bulkSize) {
                int currentOnGoingBulks = WikipediaRiver.this.onGoingBulks.incrementAndGet();
                if (currentOnGoingBulks > WikipediaRiver.this.dropThreshold) {
                    WikipediaRiver.this.onGoingBulks.decrementAndGet();
                    WikipediaRiver.this.logger.warn("dropping bulk, [{}] crossed threshold [{}]", new Object[]{WikipediaRiver.this.onGoingBulks, WikipediaRiver.this.dropThreshold});
                } else {
                    try {
                        WikipediaRiver.this.currentRequest.execute((ActionListener)new ActionListener<BulkResponse>(){

                            public void onResponse(BulkResponse bulkResponse) {
                                WikipediaRiver.this.onGoingBulks.decrementAndGet();
                            }

                            public void onFailure(Throwable e) {
                                WikipediaRiver.this.logger.warn("failed to execute bulk", new Object[0]);
                            }
                        });
                    }
                    catch (Exception e) {
                        WikipediaRiver.this.logger.warn("failed to process bulk", (Throwable)e, new Object[0]);
                    }
                }
                WikipediaRiver.this.currentRequest = WikipediaRiver.this.client.prepareBulk();
            }
        }
    }

    private class Parser
    implements Runnable {
        private final WikiXMLParser parser;

        private Parser(WikiXMLParser parser) {
            this.parser = parser;
        }

        @Override
        public void run() {
            try {
                this.parser.parse();
            }
            catch (Exception e) {
                if (WikipediaRiver.this.closed) {
                    return;
                }
                WikipediaRiver.this.logger.error("failed to parse stream", (Throwable)e, new Object[0]);
            }
        }
    }
}

