package org.wso2.extension.siddhi.io.wso2event.source;

import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.wso2.carbon.databridge.core.DataBridgeReceiverService;
import org.wso2.carbon.databridge.core.DataBridgeStreamStore;
import org.wso2.carbon.databridge.core.DataBridgeSubscriberService;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;

@Component(name = "org.wso2.extension.siddhi.io.wso2event.source.WSO2EventSourceDS", immediate = true)
/* loaded from: input_file:org/wso2/extension/siddhi/io/wso2event/source/WSO2EventSourceDS.class */
public class WSO2EventSourceDS {
    private static final Log LOGGER = LogFactory.getLog(WSO2EventSourceDS.class);
    private boolean isEventStreamServiceActive;
    private boolean isReceiverServiceActive;

    @Activate
    protected void start(BundleContext bundleContext) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("WSO2EventSource Component is started");
        }
    }

    @Deactivate
    protected void stop() throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("WSO2EventSource Component is stopped");
        }
    }

    @Reference(name = "databridge.subscriber.service", service = DataBridgeSubscriberService.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetDataBridgeSubscriberService")
    protected void setDataBridgeSubscriberService(DataBridgeSubscriberService dataBridgeSubscriberService) {
        if (WSO2EventSourceRegistrationManager.getDataBridgeSubscriberService() == null) {
            WSO2EventSourceRegistrationManager.setDataBridgeSubscriberService(dataBridgeSubscriberService);
            AgentCallbackImpl agentCallbackImpl = new AgentCallbackImpl();
            WSO2EventSourceRegistrationManager.setAgentCallbackImpl(agentCallbackImpl);
            dataBridgeSubscriberService.subscribe(agentCallbackImpl);
        }
    }

    protected void unsetDataBridgeSubscriberService(DataBridgeSubscriberService dataBridgeSubscriberService) {
        WSO2EventSourceRegistrationManager.setDataBridgeSubscriberService(null);
    }

    @Reference(name = "databridge.stream.store", service = DataBridgeStreamStore.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetDataBridgeEventStreamService")
    protected void setDataBridgeEventStreamService(DataBridgeStreamStore dataBridgeStreamStore) {
        WSO2EventSourceRegistrationManager.setDataBridgeStreamStore(dataBridgeStreamStore);
        this.isEventStreamServiceActive = true;
        if (this.isReceiverServiceActive) {
            WSO2EventSourceDataHolder.setDatabridgeActivated(true);
            connectSources();
        }
    }

    protected void unsetDataBridgeEventStreamService(DataBridgeStreamStore dataBridgeStreamStore) {
        WSO2EventSourceRegistrationManager.setDataBridgeStreamStore(null);
        WSO2EventSourceDataHolder.setDatabridgeActivated(false);
        this.isEventStreamServiceActive = false;
    }

    @Reference(name = "databridge.receiver.service", service = DataBridgeReceiverService.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC, unbind = "unsetDataBridgeReceiverService")
    protected void setDataBridgeReceiverService(DataBridgeReceiverService dataBridgeReceiverService) {
        this.isReceiverServiceActive = true;
        if (this.isEventStreamServiceActive) {
            WSO2EventSourceDataHolder.setDatabridgeActivated(true);
            connectSources();
        }
    }

    protected void unsetDataBridgeReceiverService(DataBridgeReceiverService dataBridgeReceiverService) {
        WSO2EventSourceRegistrationManager.setDataBridgeStreamStore(null);
        WSO2EventSourceDataHolder.setDatabridgeActivated(false);
        this.isReceiverServiceActive = false;
    }

    private void connectSources() {
        Iterator<Source> it = WSO2EventSourceDataHolder.getSources().iterator();
        while (it.hasNext()) {
            Source next = it.next();
            try {
                ((WSO2EventSource) next).connect();
            } catch (ConnectionUnavailableException e) {
                LOGGER.error("Exception when generating the WSO2 stream definition for Source " + next.getElementId(), e);
            }
            it.remove();
        }
    }
}
