package io.bigdime.core.adaptor;

import io.bigdime.adaptor.metadata.MetadataStore;
import io.bigdime.alert.Logger;
import io.bigdime.alert.LoggerFactory;
import io.bigdime.core.AdaptorConfigurationException;
import io.bigdime.core.AdaptorContext;
import io.bigdime.core.AdaptorPhase;
import io.bigdime.core.DataAdaptorException;
import io.bigdime.core.DataChannel;
import io.bigdime.core.Sink;
import io.bigdime.core.Source;
import io.bigdime.core.channel.ChannelFactory;
import io.bigdime.core.commons.AdaptorLogger;
import io.bigdime.core.config.ADAPTOR_TYPE;
import io.bigdime.core.config.AdaptorConfig;
import io.bigdime.core.config.AdaptorConfigReader;
import io.bigdime.core.config.ChannelConfig;
import io.bigdime.core.config.SinkConfig;
import io.bigdime.core.sink.DataSink;
import io.bigdime.core.sink.DataSinkFactory;
import io.bigdime.core.source.DataSourceFactory;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import javax.annotation.PostConstruct;
import org.apache.flume.lifecycle.LifecycleState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

@Component
/* loaded from: input_file:lib/bigdime-core-0.9.1.jar:io/bigdime/core/adaptor/DataAdaptor.class */
public final class DataAdaptor implements Adaptor {

    @Autowired
    private AdaptorConfigReader adaptorConfigReader;

    @Autowired
    private ChannelFactory channelFactory;

    @Autowired
    private DataSinkFactory dataSinkFactory;

    @Autowired
    private DataSourceFactory dataSourceFactory;

    @Autowired
    private DataAdaptorJob dataAdaptorJob;

    @Autowired
    private MetadataStore metadataStore;
    private boolean sourceRunning;
    private boolean sinkRunning;
    private static final AdaptorLogger logger = new AdaptorLogger(LoggerFactory.getLogger((Class<?>) DataAdaptor.class));
    private static AdaptorPhase adaptorCurrentPhase = AdaptorPhase.INIT;
    private static long DEFAULT_SLEEP_DURATION_SECONDS = 3000;
    private boolean adaptorStopped = false;
    private long heartbeatSleepDurationSecs = DEFAULT_SLEEP_DURATION_SECONDS;
    private Thread heartbeatThread = null;
    private AdaptorConfig config = AdaptorConfig.getInstance();

    private DataAdaptor() throws AdaptorConfigurationException {
    }

    @PostConstruct
    private void init() throws AdaptorConfigurationException {
        try {
            adaptorCurrentPhase = AdaptorPhase.INIT;
            logger.info(adaptorCurrentPhase.getValue(), "initializing DataAdaptor");
            this.adaptorConfigReader.readConfig(this.config);
            logger.info(adaptorCurrentPhase.getValue(), this.config.toString());
            initializeComponents();
            if (this.config.isAutoStart()) {
                if (this.config.getType() == ADAPTOR_TYPE.BATCH) {
                    this.dataAdaptorJob.scheduleBatchJob(this);
                } else {
                    this.dataAdaptorJob.scheduleStreamingJob(this);
                }
            }
            logger.info(adaptorCurrentPhase.getValue(), "source=\"{}\" channels=\"{}\" sinks=\"{}\"", Integer.valueOf(getSources().size()), Integer.valueOf(getChannels().size()), Integer.valueOf(getSinks().size()));
            this.metadataStore.createDatasourceIfNotExist(AdaptorConfig.getInstance().getName(), this.config.getSourceConfig().getSourceType());
        } catch (AdaptorConfigurationException e) {
            logger.alert(Logger.ALERT_TYPE.ADAPTOR_FAILED_TO_START, Logger.ALERT_CAUSE.INVALID_ADAPTOR_CONFIGURATION, Logger.ALERT_SEVERITY.BLOCKER, e.getMessage(), e);
            throw e;
        } catch (Exception e2) {
            logger.alert(Logger.ALERT_TYPE.ADAPTOR_FAILED_TO_START, Logger.ALERT_CAUSE.INVALID_ADAPTOR_CONFIGURATION, Logger.ALERT_SEVERITY.BLOCKER, e2.getMessage(), e2);
            Assert.isNull(e2, "Exception during DataAdaptor startup, can not continue");
        }
    }

    @Override // io.bigdime.core.adaptor.Adaptor
    public synchronized boolean start() throws DataAdaptorException {
        logger.info("starting adaptor", "command received to start adaptor");
        if (adaptorCurrentPhase == AdaptorPhase.STARTING || adaptorCurrentPhase == AdaptorPhase.STARTED) {
            if (isSourceRunning()) {
                logger.warn("starting adaptor", "adaptor already running, at least one source is still running. can't start the sources");
                return false;
            }
            startSources();
            return true;
        }
        setAdaptorCurrentPhase(AdaptorPhase.STARTING);
        startChannels();
        startSink();
        startSources();
        setAdaptorCurrentPhase(AdaptorPhase.STARTED);
        checkHeartbeat();
        return true;
    }

    @Override // io.bigdime.core.adaptor.Adaptor
    public synchronized void stop() throws DataAdaptorException {
        if (adaptorCurrentPhase != AdaptorPhase.STARTING && adaptorCurrentPhase != AdaptorPhase.STARTED) {
            throw new DataAdaptorException("adaptor not running");
        }
        setAdaptorCurrentPhase(AdaptorPhase.STOPPING);
        stopSource();
        stopSink();
        setAdaptorCurrentPhase(AdaptorPhase.STOPPED);
        this.adaptorStopped = true;
    }

    private static void setAdaptorCurrentPhase(AdaptorPhase adaptorPhase) {
        adaptorCurrentPhase = adaptorPhase;
    }

    public static String getAdaptorCurrentPhase() {
        return adaptorCurrentPhase.getValue();
    }

    private void initializeComponents() throws AdaptorConfigurationException {
        initializeChannels();
        initializeSource();
        initializeSinks();
    }

    private void initializeSource() throws AdaptorConfigurationException {
        setSources(this.dataSourceFactory.getDataSource(this.config.getSourceConfig()));
    }

    private void initializeChannels() throws AdaptorConfigurationException {
        HashSet hashSet = new HashSet();
        Iterator<ChannelConfig> it = this.config.getChannelConfigs().iterator();
        while (it.hasNext()) {
            hashSet.addAll(this.channelFactory.getChannel(it.next()));
        }
        if (hashSet.isEmpty()) {
            throw new AdaptorConfigurationException("adaptor could not be configured, channels is null after building the adaptor");
        }
        setChannels(hashSet);
    }

    private void initializeSinks() throws AdaptorConfigurationException {
        HashSet hashSet = new HashSet();
        Iterator<SinkConfig> it = this.config.getSinkConfigs().iterator();
        while (it.hasNext()) {
            hashSet.addAll(this.dataSinkFactory.getDataSink(it.next()));
        }
        for (Sink sink : hashSet) {
            Iterator<Source> it2 = getSources().iterator();
            while (it2.hasNext()) {
                ((DataSink) sink).addObserver(it2.next());
            }
        }
        if (hashSet.isEmpty()) {
            throw new AdaptorConfigurationException("adaptor could not be configured, sink is null after building the adaptor");
        }
        setSinks(hashSet);
    }

    private void startChannels() throws AdaptorConfigurationException {
        logger.debug("adaptor calling start on each channel", "channels.size=\"{}\"", Integer.valueOf(getChannels().size()));
        for (DataChannel dataChannel : getChannels()) {
            logger.debug("adaptor calling start on channel", "channel_name=\"{}\"", dataChannel.getName());
            dataChannel.start();
        }
    }

    private void startSources() throws AdaptorConfigurationException {
        this.sourceRunning = true;
        Collection<Source> sources = getSources();
        logger.debug("adaptor calling start on each source", "sources.size=\"{}\"", Integer.valueOf(sources.size()));
        for (Source source : sources) {
            logger.debug("adaptor calling start on source", "source_name=\"{}\"", source.getName());
            source.start();
        }
    }

    private void startSink() throws AdaptorConfigurationException {
        logger.debug("adaptor calling start on each sink", "sinks_size=\"{}\"", Integer.valueOf(getSinks().size()));
        for (Sink sink : getSinks()) {
            logger.debug("adaptor calling start on sink", "sink_name=\"{}\"", sink.getName());
            sink.start();
        }
    }

    private void checkHeartbeat() {
        this.heartbeatThread = new Thread() { // from class: io.bigdime.core.adaptor.DataAdaptor.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    DataAdaptor.logger.info("heartbeat thread for DataAdaptor", "heathcheck thread for DataAdaptor");
                    while (!DataAdaptor.this.adaptorStopped) {
                        DataAdaptor.this.isSourceRunning();
                        DataAdaptor.this.isSinkRunning();
                        DataAdaptor.logger.debug("heartbeat thread for DataAdaptor", "source_running=\"{}\" sink_running=\"{}\"", Boolean.valueOf(DataAdaptor.this.sourceRunning), Boolean.valueOf(DataAdaptor.this.sinkRunning));
                        sleep(DataAdaptor.this.heartbeatSleepDurationSecs);
                    }
                } catch (Exception e) {
                    DataAdaptor.logger.warn("heartbeat thread for DataAdaptor", "DataAdaptor heartbeat thread received an exception, will duck it. sleep_duration=\"{}\"", Long.valueOf(DataAdaptor.this.heartbeatSleepDurationSecs), e);
                }
            }
        };
        this.heartbeatThread.start();
    }

    public boolean isSourceRunning() {
        this.sourceRunning = false;
        for (Source source : getSources()) {
            LifecycleState lifecycleState = source.getLifecycleState();
            if (lifecycleState != LifecycleState.STOP) {
                this.sourceRunning = true;
            }
            logger.debug("DataAdaptor checking source state", "source_name=\"{}\" state=\"{}\"", source.getName(), lifecycleState);
        }
        return this.sourceRunning;
    }

    public boolean isSinkRunning() {
        this.sinkRunning = false;
        for (Sink sink : getSinks()) {
            LifecycleState lifecycleState = sink.getLifecycleState();
            if (lifecycleState == LifecycleState.START) {
                this.sinkRunning = true;
            }
            logger.debug("DataAdaptor checking sink state", "sink_name=\"{}\" state=\"{}\"", sink.getName(), lifecycleState);
        }
        return this.sinkRunning;
    }

    private void stopSource() throws AdaptorConfigurationException {
        for (Source source : getSources()) {
            logger.debug("adaptor calling stop on source", "source_name=\"{}\"", source.getName());
            source.stop();
        }
    }

    private void stopSink() throws AdaptorConfigurationException {
        for (Sink sink : getSinks()) {
            logger.debug("adaptor calling stop on sink", "sink_name=\"{}\"", sink.getName());
            sink.stop();
        }
    }

    @Override // io.bigdime.core.adaptor.Adaptor
    public AdaptorContext getAdaptorContext() {
        return getAdaptorConfig().getAdaptorContext();
    }

    @Override // io.bigdime.core.adaptor.Adaptor
    public AdaptorConfig getAdaptorConfig() {
        return this.config;
    }

    public Collection<Source> getSources() {
        return this.config.getAdaptorContext().getSources();
    }

    public Collection<Sink> getSinks() {
        return this.config.getAdaptorContext().getSinks();
    }

    public Collection<DataChannel> getChannels() {
        return this.config.getAdaptorContext().getChannels();
    }

    public void setSources(Collection<Source> collection) {
        this.config.getAdaptorContext().setSources(collection);
    }

    public void setSinks(Collection<Sink> collection) {
        this.config.getAdaptorContext().setSinks(collection);
    }

    public void setChannels(Collection<DataChannel> collection) {
        this.config.getAdaptorContext().setChannels(collection);
    }
}
