package org.wso2.extension.siddhi.io.wso2event.sink;

import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.carbon.databridge.agent.DataPublisher;
import org.wso2.carbon.databridge.agent.exception.DataEndpointAgentConfigurationException;
import org.wso2.carbon.databridge.agent.exception.DataEndpointAuthenticationException;
import org.wso2.carbon.databridge.agent.exception.DataEndpointConfigurationException;
import org.wso2.carbon.databridge.agent.exception.DataEndpointException;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.exception.TransportException;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.output.sink.Sink;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.DynamicOptions;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

@Extension(name = "wso2event", namespace = "sink", description = "The WSO2Event source pushes wso2events via TCP (databridge) in `wso2event` format. You can send wso2events through `Thrift` or `Binary` protocols.", parameters = {@Parameter(name = "wso2.stream.id", description = "Stream Id to use when publishing events. If stream id is not defined, it uses the respective siddhi stream name with version 1.0.0.e.g.,if the stream definition is `org.wso2.stream.bar.stream`, then the default value is `org.wso2.stream.bar.stream:1.0.0`.", defaultValue = "the defined stream ID:1.0.0", type = {DataType.STRING}, optional = true), @Parameter(name = "url", description = "The URL to which the outgoing events published via TCP over Thrift or Binary. e.g., `tcp://localhost:7611`", type = {DataType.STRING}), @Parameter(name = "auth.url", description = "The Thrift/Binary server endpoint url which used fot authentication purposes. It is not mandatory property. If this property is not provided then tcp-port+100 used for port in auth.url. e.g., `ssl://localhost:7711`", type = {DataType.STRING}, optional = true, defaultValue = "ssl://localhost:<tcp-port> + 100"), @Parameter(name = "username", description = "The username is used for authentication flow before publishing eventse.g., `admin`", type = {DataType.STRING}), @Parameter(name = "password", description = "The password is used for authentication flow before publishing eventse.g., `admin`", type = {DataType.STRING}), @Parameter(name = "protocol", description = "There are two protocols that we can use to publish events through data bridge.Either, we can use thrift or binary. Default value is Thrifte.g., `thrift`", type = {DataType.STRING}, optional = true, defaultValue = "thrift"), @Parameter(name = "mode", description = "Property which decides whether to publish events in synchronous or asynchronous mode. Default is non-blocking mode.e.g., `blocking`", type = {DataType.STRING}, optional = true, defaultValue = "non-blocking")}, examples = {@Example(syntax = "@sink(type='wso2event', wso2.stream.id='fooStream:1.0.0', url=\"tcp://localhost:7611\", auth.url=\"ssl://localhost:7711\", protocol=\"thrift\", username=\"admin\", password=\"admin\", mode=\"non-blocking\" , @map(type='wso2event'))\nDefine stream barStream(system string, price float, volume long);", description = "As defined in above query events are pushed to destination that defined.")})
/* loaded from: input_file:org/wso2/extension/siddhi/io/wso2event/sink/WSO2EventSink.class */
public class WSO2EventSink extends Sink {
    private static final Logger LOGGER = Logger.getLogger(WSO2EventSink.class);
    private DataPublisher dataPublisher;
    private String authUrl;
    private String url;
    private String username;
    private String password;
    private String publisherMode;
    private String protocol;
    private String siddhiAppName;
    private int timeout;
    private String streamId;

    protected void init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.authUrl = optionHolder.validateAndGetStaticValue("auth.url", (String) null);
        this.url = optionHolder.validateAndGetStaticValue("url");
        this.username = optionHolder.validateAndGetStaticValue("username");
        this.password = optionHolder.validateAndGetStaticValue("password");
        this.publisherMode = optionHolder.validateAndGetStaticValue("mode", "non-blocking");
        this.protocol = optionHolder.validateAndGetStaticValue("protocol", "thrift");
        this.streamId = optionHolder.validateAndGetStaticValue("wso2.stream.id", (String) null);
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue("timeout", (String) null);
        if (validateAndGetStaticValue != null) {
            this.timeout = Integer.parseInt(validateAndGetStaticValue);
        }
        this.siddhiAppName = siddhiAppContext.getName();
    }

    public void connect() throws ConnectionUnavailableException {
        try {
            this.dataPublisher = new DataPublisher(this.protocol, this.url, this.authUrl, this.username, this.password);
        } catch (TransportException e) {
            throw new ConnectionUnavailableException("Transport exception occurred when connecting to databridge endpoint given in " + this.siddhiAppName + " with the url:" + this.url + " authUrl:" + this.authUrl + " protocol:" + this.protocol + " and userName:" + this.username + "," + e.getMessage(), e);
        } catch (DataEndpointAuthenticationException e2) {
            throw new ConnectionUnavailableException("Error while authenticating to databridge endpoint given in " + this.siddhiAppName + " with the url:" + this.url + " authUrl:" + this.authUrl + " protocol:" + this.protocol + " and userName:" + this.username + "," + e2.getMessage(), e2);
        } catch (DataEndpointConfigurationException e3) {
            throw new ConnectionUnavailableException("Error in databridge endpoint configuration given in " + this.siddhiAppName + " with the url:" + this.url + " authUrl:" + this.authUrl + " protocol:" + this.protocol + " and userName:" + this.username + "," + e3.getMessage(), e3);
        } catch (DataEndpointException e4) {
            throw new ConnectionUnavailableException("Error in connecting to databridge endpoint configuration given in " + this.siddhiAppName + " with the url:" + this.url + " authUrl:" + this.authUrl + " protocol:" + this.protocol + " and userName:" + this.username + "," + e4.getMessage(), e4);
        } catch (DataEndpointAgentConfigurationException e5) {
            throw new ConnectionUnavailableException("Error in event sink data-bridge client configuration given in " + this.siddhiAppName + " with the url:" + this.url + " authUrl:" + this.authUrl + " protocol:" + this.protocol + " and userName:" + this.username + "," + e5.getMessage(), e5);
        }
    }

    public void publish(Object obj, DynamicOptions dynamicOptions) throws ConnectionUnavailableException {
        Event event = (Event) obj;
        if (this.streamId != null) {
            event.setStreamId(this.streamId);
        }
        if ("non-blocking".equalsIgnoreCase(this.publisherMode)) {
            this.dataPublisher.publish(event);
        } else {
            if (this.dataPublisher.tryPublish(event, this.timeout)) {
                return;
            }
            LOGGER.error("Event dropped at WSO2Event sink in executionplan '" + this.siddhiAppName + " , dropping event: " + event);
        }
    }

    public void disconnect() {
        if (this.dataPublisher != null) {
            try {
                this.dataPublisher.shutdown();
            } catch (DataEndpointException e) {
                LOGGER.error("Error in shutting down the data publisher created for execution plan " + this.siddhiAppName + " with the url:" + this.url + " authUrl:" + this.authUrl + " protocol:" + this.protocol + " and userName:" + this.username + "," + e.getMessage(), e);
            }
        }
    }

    public void destroy() {
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{Event.class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[0];
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }
}
