/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.indexer.fieldtypes;

import com.google.common.collect.ImmutableSet;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.primitives.Ints;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.MongoIndexSet;
import org.graylog2.indexer.cluster.Cluster;
import org.graylog2.indexer.fieldtypes.IndexFieldTypePoller;
import org.graylog2.indexer.fieldtypes.IndexFieldTypesDTO;
import org.graylog2.indexer.fieldtypes.IndexFieldTypesService;
import org.graylog2.indexer.indexset.IndexSetConfig;
import org.graylog2.indexer.indexset.IndexSetService;
import org.graylog2.indexer.indexset.events.IndexSetCreatedEvent;
import org.graylog2.indexer.indexset.events.IndexSetDeletedEvent;
import org.graylog2.indexer.indices.Indices;
import org.graylog2.indexer.indices.TooManyAliasesException;
import org.graylog2.indexer.indices.events.IndicesDeletedEvent;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.plugin.periodical.Periodical;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IndexFieldTypePollerPeriodical
extends Periodical {
    private static final Logger LOG = LoggerFactory.getLogger(IndexFieldTypePollerPeriodical.class);
    private final IndexFieldTypePoller poller;
    private final IndexFieldTypesService dbService;
    private final IndexSetService indexSetService;
    private final Indices indices;
    private final MongoIndexSet.Factory mongoIndexSetFactory;
    private final Cluster cluster;
    private final ServerStatus serverStatus;
    private final com.github.joschi.jadconfig.util.Duration periodicalInterval;
    private final ScheduledExecutorService scheduler;
    private final ConcurrentMap<String, ScheduledFuture<?>> futures = new ConcurrentHashMap();
    private static final Set<Lifecycle> skippedLifecycles = ImmutableSet.of((Object)((Object)Lifecycle.STARTING), (Object)((Object)Lifecycle.HALTING), (Object)((Object)Lifecycle.PAUSED), (Object)((Object)Lifecycle.FAILED), (Object)((Object)Lifecycle.UNINITIALIZED));

    @Inject
    public IndexFieldTypePollerPeriodical(IndexFieldTypePoller poller, IndexFieldTypesService dbService, IndexSetService indexSetService, Indices indices, MongoIndexSet.Factory mongoIndexSetFactory, Cluster cluster, EventBus eventBus, ServerStatus serverStatus, @Named(value="index_field_type_periodical_interval") com.github.joschi.jadconfig.util.Duration periodicalInterval, @Named(value="daemonScheduler") ScheduledExecutorService scheduler) {
        this.poller = poller;
        this.dbService = dbService;
        this.indexSetService = indexSetService;
        this.indices = indices;
        this.mongoIndexSetFactory = mongoIndexSetFactory;
        this.cluster = cluster;
        this.serverStatus = serverStatus;
        this.periodicalInterval = periodicalInterval;
        this.scheduler = scheduler;
        eventBus.register((Object)this);
    }

    @Override
    public void doRun() {
        if (this.serverIsNotRunning()) {
            LOG.debug("Server is not running, skipping run.");
            return;
        }
        if (!this.cluster.isConnected()) {
            LOG.info("Cluster not connected yet, delaying index field type initialization until it is reachable.");
            while (true) {
                try {
                    this.cluster.waitForConnectedAndDeflectorHealthy();
                }
                catch (InterruptedException | TimeoutException e) {
                    LOG.warn("Interrupted or timed out waiting for Elasticsearch cluster, checking again.");
                    continue;
                }
                break;
            }
        }
        this.indexSetService.findAll().forEach(indexSetConfig -> {
            String indexSetId = indexSetConfig.id();
            String indexSetTitle = indexSetConfig.title();
            ImmutableSet existingIndexTypes = ImmutableSet.copyOf(this.dbService.findForIndexSet(indexSetId));
            MongoIndexSet indexSet = this.mongoIndexSetFactory.create((IndexSetConfig)indexSetConfig);
            LOG.debug("Updating index field types for index set <{}/{}>", (Object)indexSetTitle, (Object)indexSetId);
            this.poller.poll(indexSet, (Set<IndexFieldTypesDTO>)existingIndexTypes).forEach(this.dbService::upsert);
            if (!this.futures.containsKey(indexSetId)) {
                this.schedule(indexSet);
            }
            this.dbService.findForIndexSet(indexSetId).stream().filter(types -> !this.indices.exists(types.indexName())).forEach(types -> this.dbService.delete(types.id()));
        });
    }

    private boolean serverIsNotRunning() {
        Lifecycle currentLifecycle = this.serverStatus.getLifecycle();
        return skippedLifecycles.contains((Object)currentLifecycle);
    }

    @Subscribe
    public void handleIndexSetCreation(IndexSetCreatedEvent event) {
        String indexSetId = event.indexSet().id();
        Optional<IndexSetConfig> optionalIndexSet = this.indexSetService.get(indexSetId);
        if (optionalIndexSet.isPresent()) {
            this.schedule(this.mongoIndexSetFactory.create(optionalIndexSet.get()));
        } else {
            LOG.warn("Couldn't find newly created index set <{}>", (Object)indexSetId);
        }
    }

    @Subscribe
    public void handleIndexSetDeletion(IndexSetDeletedEvent event) {
        String indexSetId = event.id();
        LOG.debug("Disable field type updating for index set <{}>", (Object)indexSetId);
        this.cancel((ScheduledFuture)this.futures.remove(indexSetId));
    }

    @Subscribe
    public void handleIndexDeletion(IndicesDeletedEvent event) {
        event.indices().forEach(indexName -> {
            LOG.debug("Removing field type information for deleted index <{}>", indexName);
            this.dbService.delete((String)indexName);
        });
    }

    private void schedule(IndexSet indexSet) {
        String indexSetId = indexSet.getConfig().id();
        String indexSetTitle = indexSet.getConfig().title();
        Duration refreshInterval = indexSet.getConfig().fieldTypeRefreshInterval();
        if (Duration.ZERO.equals((Object)refreshInterval)) {
            LOG.debug("Skipping index set with ZERO refresh interval <{}/{}>", (Object)indexSetTitle, (Object)indexSetId);
            return;
        }
        if (!indexSet.getConfig().isWritable()) {
            LOG.debug("Skipping non-writable index set <{}/{}>", (Object)indexSetTitle, (Object)indexSetId);
            return;
        }
        this.cancel((ScheduledFuture)this.futures.get(indexSetId));
        LOG.debug("Schedule index field type updating for index set <{}/{}> every {} ms", new Object[]{indexSetId, indexSetTitle, refreshInterval.getMillis()});
        ScheduledFuture<?> future = this.scheduler.scheduleAtFixedRate(() -> {
            if (this.serverIsNotRunning()) {
                return;
            }
            try {
                String activeWriteIndex = indexSet.getActiveWriteIndex();
                if (activeWriteIndex != null) {
                    LOG.debug("Updating index field types for active write index <{}> in index set <{}/{}>", new Object[]{activeWriteIndex, indexSetTitle, indexSetId});
                    this.poller.pollIndex(activeWriteIndex, indexSetId).ifPresent(this.dbService::upsert);
                } else {
                    LOG.warn("Active write index for index set \"{}\" ({}) doesn't exist yet", (Object)indexSetTitle, (Object)indexSetId);
                }
            }
            catch (TooManyAliasesException e) {
                LOG.error("Couldn't get active write index", (Throwable)e);
            }
            catch (Exception e) {
                LOG.error("Couldn't update field types for index set <{}/{}>", new Object[]{indexSetTitle, indexSetId, e});
            }
        }, 0L, refreshInterval.getMillis(), TimeUnit.MILLISECONDS);
        this.futures.put(indexSetId, future);
    }

    private void cancel(@Nullable ScheduledFuture<?> future) {
        if (future != null && !future.isCancelled() && !future.cancel(true)) {
            LOG.warn("Couldn't cancel field type update job");
        }
    }

    @Override
    public boolean runsForever() {
        return false;
    }

    @Override
    public boolean stopOnGracefulShutdown() {
        return true;
    }

    @Override
    public boolean masterOnly() {
        return true;
    }

    @Override
    public boolean startOnThisNode() {
        return true;
    }

    @Override
    public boolean isDaemon() {
        return true;
    }

    @Override
    public int getInitialDelaySeconds() {
        return 0;
    }

    @Override
    public int getPeriodSeconds() {
        return Ints.saturatedCast((long)this.periodicalInterval.toSeconds());
    }

    @Override
    protected Logger getLogger() {
        return LOG;
    }
}

