package org.wso2.extension.siddhi.io.http.sink;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.SystemParameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.StreamDefinition;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.extension.siddhi.io.http.sink.exception.HttpSinkAdaptorRuntimeException;
import org.wso2.extension.siddhi.io.http.sink.updatetoken.AccessTokenCache;
import org.wso2.extension.siddhi.io.http.sink.updatetoken.DefaultListener;
import org.wso2.extension.siddhi.io.http.sink.updatetoken.HttpsClient;
import org.wso2.extension.siddhi.io.http.sink.util.HttpSinkUtil;
import org.wso2.extension.siddhi.io.http.util.HttpConstants;
import org.wso2.extension.siddhi.io.http.util.HttpIoUtil;
import org.wso2.transport.http.netty.contract.HttpClientConnector;
import org.wso2.transport.http.netty.contract.config.ChunkConfig;
import org.wso2.transport.http.netty.contract.config.ProxyServerConfiguration;
import org.wso2.transport.http.netty.contract.config.SenderConfiguration;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

@Extension(name = "http", namespace = "sink", description = "This extension publish the HTTP events in any HTTP method  POST, GET, PUT, DELETE  via HTTP or https protocols. As the additional features this component can provide basic authentication as well as user can publish events using custom client truststore files when publishing events via https protocol. And also user can add any number of headers including HTTP_METHOD header for each event dynamically.\nFollowing content types will be set by default according to the type of sink mapper used.\nYou can override them by setting the new content types in headers.\n     - TEXT : text/plain\n     - XML : application/xml\n     - JSON : application/json\n     - KEYVALUE : application/x-www-form-urlencoded", parameters = {@Parameter(name = HttpConstants.PUBLISHER_URL, description = "The URL to which the outgoing events should be published via HTTP. This is a mandatory parameter and if this is not specified, an error is logged in the CLI. If user wants to enable SSL for the events, use `https` instead of `http` in the publisher.url.e.g., `http://localhost:8080/endpoint`, `https://localhost:8080/endpoint`", type = {DataType.STRING}), @Parameter(name = HttpConstants.RECEIVER_USERNAME, description = "The username to be included in the authentication header of the basic authentication enabled events. It is required to specify both username and password to enable basic authentication. If one of the parameter is not given by user then an error is logged in the CLI.", type = {DataType.STRING}, optional = true, defaultValue = " "), @Parameter(name = HttpConstants.RECEIVER_PASSWORD, description = "The password to include in the authentication header of the basic authentication enabled events. It is required to specify both username and password to enable basic authentication. If one of the parameter is not given by user then an error is logged in the CLI.", type = {DataType.STRING}, optional = true, defaultValue = " "), @Parameter(name = HttpConstants.CLIENT_TRUSTSTORE_PATH_PARAM, description = "The file path to the location of the truststore of the client that sends the HTTP events through 'https' protocol. A custom client-truststore can be specified if required.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.CLIENT_TRUSTSTORE_PATH_VALUE), @Parameter(name = HttpConstants.CLIENT_TRUSTSTORE_PASSWORD_PARAM, description = "The password for the client-truststore. A custom password can be specified if required. If no custom password is specified and the protocol of URL is 'https' then, the system uses default password.", type = {DataType.STRING}, optional = true, defaultValue = "wso2carbon"), @Parameter(name = HttpConstants.HEADERS, description = "The headers that should be included as HTTP request headers. \nThere can be any number of headers concatenated in following format. \"'header1:value1','header2:value2'\". User can include Content-Type header if he needs to use a specific content-type for the payload. Or else, system decides the Content-Type by considering the type of sink mapper, in following way.\n - @map(xml):application/xml\n - @map(json):application/json\n - @map(text):plain/text )\n - if user does not include any mapping type then the system gets 'plain/text' as default Content-Type header.\nNote that providing content-length as a header is not supported. The size of the payload will be automatically calculated and included in the content-length header.", type = {DataType.STRING}, optional = true, defaultValue = " "), @Parameter(name = HttpConstants.METHOD, description = "For HTTP events, HTTP_METHOD header should be included as a request header. If the parameter is null then system uses 'POST' as a default header.", type = {DataType.STRING}, optional = true, defaultValue = "POST"), @Parameter(name = HttpConstants.SOCKET_IDEAL_TIMEOUT, description = "Socket timeout value in millisecond", type = {DataType.INT}, optional = true, defaultValue = "6000"), @Parameter(name = HttpConstants.CLIENT_CHUNK_ENABLED, description = "This parameter is used to disable/enable chunked transfer encoding", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.SSL_PROTOCOL, description = "The SSL protocol version", type = {DataType.STRING}, optional = true, defaultValue = "TLS"), @Parameter(name = "parameters", description = "Parameters other than basics such as ciphers,sslEnabledProtocols,client.enable.session.creation. Expected format of these parameters is as follows: \"'ciphers:xxx','sslEnabledProtocols,client.enable:xxx'\"", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "ciphers", description = "List of ciphers to be used. This parameter should include under parameters Ex: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "ssl.enabled.protocols", description = "SSL/TLS protocols to be enabled. This parameter should be in camel case format(sslEnabledProtocols) under parameters. Ex 'sslEnabledProtocols:true'", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "client.enable.session.creation", description = "Enable HTTP session creation.This parameter should include under parameters Ex: 'client.enable.session.creation:true'", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = HttpConstants.CLIENT_FOLLOW_REDIRECT, description = "Redirect related enabled.", type = {DataType.BOOL}, optional = true, defaultValue = HttpConstants.TRUE), @Parameter(name = HttpConstants.CLIENT_MAX_REDIRECT_COUNT, description = "Maximum redirect count.", type = {DataType.INT}, optional = true, defaultValue = "5"), @Parameter(name = HttpConstants.TLS_STORE_TYPE, description = "TLS store type to be used.", type = {DataType.STRING}, optional = true, defaultValue = "JKS"), @Parameter(name = HttpConstants.PROXY_HOST, description = "Proxy server host", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = HttpConstants.PROXY_PORT, description = "Proxy server port", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = HttpConstants.PROXY_USERNAME, description = "Proxy server username", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = HttpConstants.PROXY_PASSWORD, description = "Proxy server password", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "client.bootstrap.configuration", description = "Client bootsrap configurations. Expected format of these parameters is as follows: \"'client.bootstrap.nodelay:xxx','client.bootstrap.keepalive:xxx'\"", type = {DataType.STRING}, optional = true, defaultValue = "TODO"), @Parameter(name = HttpConstants.CLIENT_BOOTSTRAP_NODELAY, description = "Http client no delay.", type = {DataType.BOOL}, optional = true, defaultValue = HttpConstants.TRUE), @Parameter(name = HttpConstants.CLIENT_BOOTSTRAP_KEEPALIVE, description = "Http client keep alive.", type = {DataType.BOOL}, optional = true, defaultValue = HttpConstants.TRUE), @Parameter(name = HttpConstants.CLIENT_BOOTSTRAP_SENDBUFFERSIZE, description = "Http client send buffer size.", type = {DataType.INT}, optional = true, defaultValue = "1048576"), @Parameter(name = HttpConstants.CLIENT_BOOTSTRAP_RECIEVEBUFFERSIZE, description = "Http client receive buffer size.", type = {DataType.INT}, optional = true, defaultValue = "1048576"), @Parameter(name = HttpConstants.CLIENT_BOOTSTRAP_CONNECT_TIMEOUT, description = "Http client connection timeout.", type = {DataType.INT}, optional = true, defaultValue = "15000"), @Parameter(name = HttpConstants.CLIENT_BOOTSTRAP_SOCKET_REUSE, description = "To enable http socket reuse.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.CLIENT_BOOTSTRAP_SOCKET_TIMEOUT, description = "Http client socket timeout.", type = {DataType.STRING}, optional = true, defaultValue = "15"), @Parameter(name = HttpConstants.CLIENT_POOL_CONFIGURATION, description = "Thread pool configuration. Expected format of these parameters is as follows: \"'client.connection.pool.count:xxx','client.max.active.connections.per.pool:xxx'\"", type = {DataType.STRING}, optional = true, defaultValue = "TODO"), @Parameter(name = HttpConstants.CLIENT_CONNECTION_POOL_COUNT, description = "Connection pool count.", type = {DataType.INT}, optional = true, defaultValue = "0"), @Parameter(name = HttpConstants.CLIENT_MAX_ACTIVE_CONNECTIONS_PER_POOL, description = "Active connections per pool.", type = {DataType.INT}, optional = true, defaultValue = HttpConstants.SOCKET_IDEAL_TIMEOUT_VALUE), @Parameter(name = HttpConstants.CLIENT_MIN_IDLE_CONNECTIONS_PER_POOL, description = "Minimum ideal connection per pool.", type = {DataType.INT}, optional = true, defaultValue = "0"), @Parameter(name = HttpConstants.CLIENT_MAX_IDLE_CONNECTIONS_PER_POOL, description = "Maximum ideal connection per pool.", type = {DataType.INT}, optional = true, defaultValue = "100"), @Parameter(name = HttpConstants.CLIENT_MIN_EVICTION_IDLE_TIME, description = "Minimum eviction idle time.", type = {DataType.STRING}, optional = true, defaultValue = "5 * 60 * 1000"), @Parameter(name = HttpConstants.SENDER_THREAD_COUNT, description = "Http sender thread count.", type = {DataType.STRING}, optional = true, defaultValue = "20"), @Parameter(name = HttpConstants.EVENT_GROUP_EXECUTOR_THREAD_SIZE, description = "Event group executor thread size.", type = {DataType.STRING}, optional = true, defaultValue = "15"), @Parameter(name = "max.wait.for.client.connection.pool", description = "Maximum wait for client connection pool.", type = {DataType.STRING}, optional = true, defaultValue = "60000"), @Parameter(name = HttpConstants.RECEIVER_OAUTH_USERNAME, description = "The username to be included in the authentication header of the oauth authentication enabled events. It is required to specify both username and password to enable oauth authentication. If one of the parameter is not given by user then an error is logged in the CLI. It is only applicable for for Oauth requests ", type = {DataType.STRING}, optional = true, defaultValue = " "), @Parameter(name = HttpConstants.RECEIVER_OAUTH_PASSWORD, description = "The password to be included in the authentication header of the oauth authentication enabled events. It is required to specify both username and password to enable oauth authentication. If one of the parameter is not given by user then an error is logged in the CLI. It is only applicable for for Oauth requests ", type = {DataType.STRING}, optional = true, defaultValue = " "), @Parameter(name = HttpConstants.CONSUMER_KEY, description = "consumer key for the Http request. It is only applicable for for Oauth requests", type = {DataType.STRING}, optional = true, defaultValue = " "), @Parameter(name = HttpConstants.CONSUMER_SECRET, description = "consumer secret for the Http request. It is only applicable for for Oauth requests", type = {DataType.STRING}, optional = true, defaultValue = " "), @Parameter(name = HttpConstants.RECEIVER_REFRESH_TOKEN, description = "refresh token for the Http request. It is only applicable for for Oauth requests", type = {DataType.STRING}, optional = true, defaultValue = " "), @Parameter(name = HttpConstants.TOKEN_URL, description = "token url for generate a new access token. It is only applicable for for Oauth requests", type = {DataType.STRING}, optional = true, defaultValue = " ")}, examples = {@Example(syntax = "@sink(type='http',publisher.url='http://localhost:8009/foo', method='{{method}}',headers=\"'content-type:xml','content-length:94'\", client.bootstrap.configuration=\"'client.bootstrap.socket.timeout:20', 'client.bootstrap.worker.group.size:10'\", client.pool.configuration=\"'client.connection.pool.count:10','client.max.active.connections.per.pool:1'\", @map(type='xml', @payload('{{payloadBody}}')))\ndefine stream FooStream (payloadBody String, method string, headers string);\n", description = "If it is xml mapping expected input should be in following format for FooStream:\n{\n<events>\n    <event>\n        <symbol>WSO2</symbol>\n        <price>55.6</price>\n        <volume>100</volume>\n    </event>\n</events>,\nPOST,\nContent-Length:24#Content-Location:USA#Retry-After:120\n}\n\nAbove event will generate output as below.\n~Output http event payload\n<events>\n    <event>\n        <symbol>WSO2</symbol>\n        <price>55.6</price>\n        <volume>100</volume>\n    </event>\n</events>\n\n~Output http event headers\nContent-Length:24,\nContent-Location:'USA',\nRetry-After:120,\nContent-Type:'application/xml',\nHTTP_METHOD:'POST',\n\n~Output http event properties\nHTTP_METHOD:'POST',\nHOST:'localhost',\nPORT:8009,\nPROTOCOL:'http',\nTO:'/foo'")}, systemParameter = {@SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_BOSS_GROUP_SIZE, description = "property to configure number of boss threads, which accepts incoming connections until the ports are unbound. Once connection accepts successfully, boss thread passes the accepted channel to one of the worker threads.", defaultValue = "Number of available processors", possibleParameters = {"Any integer"}), @SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_WORKER_GROUP_SIZE, description = "property to configure number of worker threads, which performs non blocking read and write for one or more channels in non-blocking mode.", defaultValue = "(Number of available processors)*2", possibleParameters = {"Any integer"}), @SystemParameter(name = HttpConstants.CLIENT_BOOTSTRAP_CLIENT_GROUP_SIZE, description = "property to configure number of client threads, which performs non blocking read and write for one or more channels in non-blocking mode.", defaultValue = "(Number of available processors)*2", possibleParameters = {"Any integer"}), @SystemParameter(name = HttpConstants.CLIENT_TRUSTSTORE_PATH, description = "The default truststore file path.", defaultValue = HttpConstants.CLIENT_TRUSTSTORE_PATH_VALUE, possibleParameters = {"Path to client-truststore.jks"}), @SystemParameter(name = HttpConstants.CLIENT_TRUSTSTORE_PASSWORD, description = "The default truststore password.", defaultValue = "wso2carbon", possibleParameters = {"Truststore password"})})
/* loaded from: input_file:org/wso2/extension/siddhi/io/http/sink/HttpSink.class */
public class HttpSink extends Sink {
    private static final Logger log = Logger.getLogger(HttpSink.class);
    HttpClientConnector clientConnector;
    String mapType;
    Option httpHeaderOption;
    Option httpMethodOption;
    private String streamID;
    private Map<String, String> httpURLProperties;
    private String consumerKey;
    private String consumerSecret;
    private String authorizationHeader;
    private String userName;
    private String userPassword;
    private String publisherURL;
    private Option publisherURLOption;
    private String clientStoreFile;
    private String clientStorePass;
    private int socketIdleTimeout;
    private String sslProtocol;
    private String tlsStoreType;
    private String chunkDisabled;
    private String followRedirect;
    private String maxRedirectCount;
    private String parametersList;
    private String proxyHost;
    private String proxyPort;
    private String proxyUsername;
    private String proxyPassword;
    private String clientBootstrapConfiguration;
    private String clientPoolConfiguration;
    private String bootstrapWorker;
    private String bootstrapBoss;
    private String bootstrapClient;
    private ConfigReader configReader;
    private SiddhiAppContext siddhiAppContext;
    private String oauthUsername;
    private String oauthUserPassword;
    private Option refreshToken;
    private String authType;
    private AccessTokenCache accessTokenCache = AccessTokenCache.getInstance();
    private String tokenURL;
    private DefaultHttpWsConnectorFactory httpConnectorFactory;

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, Map.class};
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{HttpConstants.HEADERS, HttpConstants.METHOD, HttpConstants.PUBLISHER_URL, HttpConstants.RECEIVER_REFRESH_TOKEN};
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.configReader = configReader;
        this.siddhiAppContext = siddhiAppContext;
        this.streamID = siddhiAppContext.getName() + ":" + streamDefinition.toString();
        this.mapType = ((Element) ((Annotation) ((Annotation) streamDefinition.getAnnotations().get(0)).getAnnotations().get(0)).getElements().get(0)).getValue();
        this.publisherURLOption = optionHolder.validateAndGetOption(HttpConstants.PUBLISHER_URL);
        this.httpHeaderOption = optionHolder.getOrCreateOption(HttpConstants.HEADERS, HttpConstants.DEFAULT_HEADER);
        this.httpMethodOption = optionHolder.getOrCreateOption(HttpConstants.METHOD, "POST");
        this.consumerKey = optionHolder.validateAndGetStaticValue(HttpConstants.CONSUMER_KEY, HttpConstants.EMPTY_STRING);
        this.consumerSecret = optionHolder.validateAndGetStaticValue(HttpConstants.CONSUMER_SECRET, HttpConstants.EMPTY_STRING);
        this.userName = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_USERNAME, HttpConstants.EMPTY_STRING);
        this.userPassword = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_PASSWORD, HttpConstants.EMPTY_STRING);
        this.oauthUsername = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_OAUTH_USERNAME, HttpConstants.EMPTY_STRING);
        this.oauthUserPassword = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_OAUTH_PASSWORD, HttpConstants.EMPTY_STRING);
        this.refreshToken = optionHolder.getOrCreateOption(HttpConstants.RECEIVER_REFRESH_TOKEN, HttpConstants.EMPTY_STRING);
        this.tokenURL = optionHolder.validateAndGetStaticValue(HttpConstants.TOKEN_URL, HttpConstants.EMPTY_STRING);
        this.clientStoreFile = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_TRUSTSTORE_PATH_PARAM, HttpSinkUtil.trustStorePath(configReader));
        this.clientStorePass = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_TRUSTSTORE_PASSWORD_PARAM, HttpSinkUtil.trustStorePassword(configReader));
        this.socketIdleTimeout = Integer.parseInt(optionHolder.validateAndGetStaticValue(HttpConstants.SOCKET_IDEAL_TIMEOUT, HttpConstants.SOCKET_IDEAL_TIMEOUT_VALUE));
        this.sslProtocol = optionHolder.validateAndGetStaticValue(HttpConstants.SSL_PROTOCOL, HttpConstants.EMPTY_STRING);
        this.tlsStoreType = optionHolder.validateAndGetStaticValue(HttpConstants.TLS_STORE_TYPE, HttpConstants.EMPTY_STRING);
        this.chunkDisabled = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_CHUNK_ENABLED, HttpConstants.EMPTY_STRING);
        this.followRedirect = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_FOLLOW_REDIRECT, HttpConstants.EMPTY_STRING);
        this.maxRedirectCount = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_MAX_REDIRECT_COUNT, HttpConstants.EMPTY_STRING);
        this.parametersList = optionHolder.validateAndGetStaticValue("parameters", HttpConstants.EMPTY_STRING);
        this.proxyHost = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_HOST, HttpConstants.EMPTY_STRING);
        this.proxyPort = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_PORT, HttpConstants.EMPTY_STRING);
        this.proxyUsername = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_USERNAME, HttpConstants.EMPTY_STRING);
        this.proxyPassword = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_PASSWORD, HttpConstants.EMPTY_STRING);
        this.clientBootstrapConfiguration = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_BOOTSTRAP_CONFIGURATION, HttpConstants.EMPTY_STRING);
        this.clientPoolConfiguration = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_POOL_CONFIGURATION, HttpConstants.EMPTY_STRING);
        this.bootstrapWorker = configReader.readConfig(HttpConstants.CLIENT_BOOTSTRAP_WORKER_GROUP_SIZE, HttpConstants.EMPTY_STRING);
        this.bootstrapBoss = configReader.readConfig(HttpConstants.CLIENT_BOOTSTRAP_BOSS_GROUP_SIZE, HttpConstants.EMPTY_STRING);
        this.bootstrapClient = configReader.readConfig(HttpConstants.CLIENT_BOOTSTRAP_CLIENT_GROUP_SIZE, HttpConstants.EMPTY_STRING);
        if (!HttpConstants.EMPTY_STRING.equals(this.userName) && !HttpConstants.EMPTY_STRING.equals(this.userPassword)) {
            this.authType = HttpConstants.BASIC_AUTH;
        } else if ((HttpConstants.EMPTY_STRING.equals(this.consumerKey) || HttpConstants.EMPTY_STRING.equals(this.consumerSecret)) && (HttpConstants.EMPTY_STRING.equals(this.oauthUsername) || HttpConstants.EMPTY_STRING.equals(this.oauthUserPassword))) {
            this.authType = HttpConstants.NO_AUTH;
        } else {
            this.authType = HttpConstants.OAUTH;
        }
        initConnectorFactory();
        if (!this.publisherURLOption.isStatic()) {
            return null;
        }
        initClientConnector(null);
        return null;
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        List<Header> headers = HttpSinkUtil.getHeaders(this.httpHeaderOption.getValue(dynamicOptions));
        if (this.authType.equals(HttpConstants.BASIC_AUTH) || this.authType.equals(HttpConstants.NO_AUTH)) {
            sendRequest(obj, dynamicOptions, headers);
        } else {
            sendOauthRequest(obj, dynamicOptions, headers);
        }
    }

    private void sendOauthRequest(Object obj, DynamicOptions dynamicOptions, List<Header> list) {
        String str = HttpConstants.AUTHORIZATION_METHOD + encodeBase64(this.consumerKey + ":" + this.consumerSecret).replaceAll(HttpConstants.NEW_LINE, HttpConstants.EMPTY_STRING);
        setAccessToken(str, dynamicOptions, list);
        int sendRequest = sendRequest(obj, dynamicOptions, list);
        if (sendRequest == 401) {
            handleOAuthFailure(obj, dynamicOptions, list, str);
            return;
        }
        if (sendRequest == 200) {
            log.info("Request sent successfully to " + this.publisherURL);
        } else {
            if (sendRequest == 500) {
                log.error("Error at sending oauth request to API endpoint " + this.publisherURL + "', with response code: " + sendRequest + "- Internal server error. Message dropped");
                throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint, " + this.publisherURL + "', with response code: " + sendRequest + "- Internal server error. Message dropped.");
            }
            log.error("Error at sending oauth request to API endpoint " + this.publisherURL + "', with response code: " + sendRequest + ". Message dropped.");
            throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + this.publisherURL + "', and response code: " + sendRequest + ". Message dropped.");
        }
    }

    private void handleOAuthFailure(Object obj, DynamicOptions dynamicOptions, List<Header> list, String str) {
        if (Boolean.valueOf(this.accessTokenCache.checkAvailableKey(str)).booleanValue()) {
            getNewAccessTokenWithCache(obj, dynamicOptions, list, str);
        } else {
            requestForNewAccessToken(obj, dynamicOptions, list, str);
        }
    }

    private void getNewAccessTokenWithCache(Object obj, DynamicOptions dynamicOptions, List<Header> list, String str) {
        String accessToken = this.accessTokenCache.getAccessToken(str);
        Iterator<Header> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Header next = it.next();
            if (next.getName().equals(HttpConstants.AUTHORIZATION_HEADER)) {
                next.setValue(accessToken);
                break;
            }
        }
        int sendRequest = sendRequest(obj, dynamicOptions, list);
        if (sendRequest == 200) {
            log.info("Request sent successfully to " + this.publisherURL);
            return;
        }
        if (sendRequest == 401) {
            requestForNewAccessToken(obj, dynamicOptions, list, str);
        } else {
            if (sendRequest == 500) {
                log.error("Error at sending oauth request to API endpoint, " + this.publisherURL + "', with response code: " + sendRequest + "- Internal server error. Message dropped");
                throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint, " + this.publisherURL + "', with response code: " + sendRequest + "- Internal server error. Message dropped");
            }
            log.error("Error at sending oauth request to API endpoint " + this.publisherURL + "', with response code: " + sendRequest + ". Message dropped.");
            throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + this.publisherURL + "', with response code: " + sendRequest + ". Message dropped.");
        }
    }

    private void requestForNewAccessToken(Object obj, DynamicOptions dynamicOptions, List<Header> list, String str) {
        if (Boolean.valueOf(this.accessTokenCache.checkRefreshAvailableKey(str)).booleanValue()) {
            Iterator<Header> it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Header next = it.next();
                if (next.getName().equals(HttpConstants.RECEIVER_REFRESH_TOKEN)) {
                    if (this.accessTokenCache.getRefreshtoken(str) != null) {
                        next.setValue(this.accessTokenCache.getRefreshtoken(str));
                    }
                }
            }
        }
        getAccessToken(dynamicOptions, str, this.tokenURL);
        if (this.accessTokenCache.getResponseCode(str) != 200) {
            if (this.accessTokenCache.getResponseCode(str) == 401) {
                log.error("Failed to generate new access token for the expired access token to " + this.publisherURL + "', " + this.accessTokenCache.getResponseCode(str) + ": Authentication Failure.cPlease provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
                throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access token to " + this.publisherURL + "', " + this.accessTokenCache.getResponseCode(str) + ": Authentication Failure.Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
            }
            log.error("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(str) + ". Message dropped.");
            throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(str) + ". Message dropped.");
        }
        String accessToken = this.accessTokenCache.getAccessToken(str);
        this.accessTokenCache.setAccessToken(str, accessToken);
        if (this.accessTokenCache.getRefreshtoken(str) != null) {
            this.accessTokenCache.setRefreshtoken(str, this.accessTokenCache.getRefreshtoken(str));
        }
        Iterator<Header> it2 = list.iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            Header next2 = it2.next();
            if (next2.getName().equals(HttpConstants.AUTHORIZATION_HEADER)) {
                next2.setValue(accessToken);
                break;
            }
        }
        int sendRequest = sendRequest(obj, dynamicOptions, list);
        if (sendRequest == 200) {
            log.info("Request sent successfully to " + this.publisherURL);
            return;
        }
        if (sendRequest == 401) {
            log.error("Error at sending oauth request to API endpoint " + this.publisherURL + "', with response code: " + sendRequest + "- Authentication Failure. Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
            throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + this.publisherURL + "', with response code: " + sendRequest + "- Authentication Failure. Please provide a valid Consumer key, Consumer secret and token endpoint URL. Message dropped");
        }
        if (sendRequest == 500) {
            log.error("Error at sending oauth request to API endpoint " + this.publisherURL + "', with response code: " + sendRequest + "- Internal server error. Message dropped");
            throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + this.publisherURL + "', with response code: " + sendRequest + "- Internal server error. Message dropped");
        }
        log.error("Error at sending oauth request to API endpoint " + this.publisherURL + "', with response code: " + sendRequest + ". Message dropped.");
        throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + this.publisherURL + "', with response code: " + sendRequest + ". Message dropped.");
    }

    public void getAccessToken(DynamicOptions dynamicOptions, String str, String str2) {
        this.tokenURL = str2;
        HttpsClient httpsClient = new HttpsClient();
        if (!HttpConstants.EMPTY_STRING.equals(this.oauthUsername) && !HttpConstants.EMPTY_STRING.equals(this.oauthUserPassword)) {
            httpsClient.getPasswordGrantAccessToken(str2, this.clientStoreFile, this.clientStorePass, this.oauthUsername, this.oauthUserPassword, str);
        } else if (HttpConstants.EMPTY_STRING.equals(this.refreshToken.getValue(dynamicOptions)) && this.accessTokenCache.getRefreshtoken(str) == null) {
            httpsClient.getClientGrantAccessToken(str2, this.clientStoreFile, this.clientStorePass, str);
        } else {
            httpsClient.getRefreshGrantAccessToken(str2, this.clientStoreFile, this.clientStorePass, str, this.refreshToken.getValue(dynamicOptions));
        }
    }

    public void setAccessToken(String str, DynamicOptions dynamicOptions, List<Header> list) {
        boolean z = false;
        Iterator<Header> it = list.iterator();
        while (true) {
            if (it.hasNext()) {
                if (it.next().getName().equals(HttpConstants.AUTHORIZATION_HEADER)) {
                    z = true;
                    break;
                }
            } else {
                break;
            }
        }
        if (z) {
            if (this.accessTokenCache.checkAvailableKey(str)) {
                String accessToken = this.accessTokenCache.getAccessToken(str);
                for (Header header : list) {
                    if (header.getName().equals(HttpConstants.AUTHORIZATION_HEADER)) {
                        header.setValue(accessToken);
                        return;
                    }
                }
                return;
            }
            return;
        }
        getAccessToken(dynamicOptions, str, this.tokenURL);
        if (this.accessTokenCache.getResponseCode(str) == 200) {
            list.add(new Header(HttpConstants.AUTHORIZATION_HEADER, this.accessTokenCache.getAccessToken(str)));
            if (this.accessTokenCache.getRefreshtoken(str) != null) {
                list.add(new Header(HttpConstants.RECEIVER_REFRESH_TOKEN, this.accessTokenCache.getRefreshtoken(str)));
                return;
            }
            return;
        }
        if (this.accessTokenCache.getResponseCode(str) == 401) {
            log.error("Failed to generate new access token for the expired access token to " + this.publisherURL + "', with response code: " + this.accessTokenCache.getResponseCode(str) + "- Authentication Failure.Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
            throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access token to " + this.publisherURL + "', with response code: " + this.accessTokenCache.getResponseCode(str) + "- Authentication Failure.Please provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped");
        }
        if (this.accessTokenCache.getResponseCode(str) == 500) {
            log.error("Failed to generate new access token for the expired access token to " + this.publisherURL + "', with response code: " + this.accessTokenCache.getResponseCode(str) + "- Internal server error. Message dropped");
            throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access token to " + this.publisherURL + "', with response code: " + this.accessTokenCache.getResponseCode(str) + "- Internal server error. Message dropped");
        }
        log.error("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(str) + ". Message dropped.");
        throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access token. Error code: " + this.accessTokenCache.getResponseCode(str) + ". Message dropped.");
    }

    private int sendRequest(Object obj, DynamicOptions dynamicOptions, List<Header> list) {
        if (!this.publisherURLOption.isStatic()) {
            initClientConnector(dynamicOptions);
        }
        String value = HttpConstants.EMPTY_STRING.equals(this.httpMethodOption.getValue(dynamicOptions)) ? "POST" : this.httpMethodOption.getValue(dynamicOptions);
        String contentType = HttpSinkUtil.getContentType(this.mapType, list);
        String messageBody = getMessageBody(obj);
        HttpCarbonMessage generateCarbonMessage = generateCarbonMessage(list, contentType, value, new HttpCarbonMessage(new DefaultHttpRequest(HttpVersion.HTTP_1_1, new HttpMethod(value), HttpConstants.EMPTY_STRING)));
        if (!"GET".equals(value)) {
            generateCarbonMessage.addHttpContent(new DefaultLastHttpContent(Unpooled.wrappedBuffer(messageBody.getBytes(Charset.defaultCharset()))));
        }
        generateCarbonMessage.completeMessage();
        if (!HttpConstants.OAUTH.equals(this.authType)) {
            this.clientConnector.send(generateCarbonMessage);
            return HttpConstants.SUCCESS_CODE;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        DefaultListener defaultListener = new DefaultListener(countDownLatch, this.authType);
        this.clientConnector.send(generateCarbonMessage).setHttpConnectorListener(defaultListener);
        try {
            if (countDownLatch.await(30L, TimeUnit.SECONDS)) {
                return defaultListener.getHttpResponseMessage().getNettyHttpResponse().status().code();
            }
            log.debug("Time out due to getting getting response from " + this.publisherURL + ". Message dropped.");
            throw new HttpSinkAdaptorRuntimeException("Time out due to getting getting response from " + this.publisherURL + ". Message dropped.");
        } catch (InterruptedException e) {
            log.debug("Failed to get a response from " + this.publisherURL + "," + e + ". Message dropped.");
            throw new HttpSinkAdaptorRuntimeException("Failed to get a response from " + this.publisherURL + ", " + e + ". Message dropped.");
        }
    }

    public void connect() {
        if (this.publisherURLOption.isStatic()) {
            log.info(this.streamID + " has successfully connected to " + this.publisherURL);
        }
    }

    public void disconnect() {
        if (this.clientConnector != null) {
            this.clientConnector = null;
            log.info("Server connector for url " + this.publisherURL + " disconnected.");
        }
        if (this.httpConnectorFactory != null) {
            this.httpConnectorFactory.shutdownNow();
            this.httpConnectorFactory = null;
        }
    }

    public void destroy() {
        if (this.clientConnector != null) {
            this.clientConnector = null;
            log.info("Server connector for url " + this.publisherURL + " disconnected.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpCarbonMessage generateCarbonMessage(List<Header> list, String str, String str2, HttpCarbonMessage httpCarbonMessage) {
        httpCarbonMessage.setProperty(HttpConstants.PROTOCOL, this.httpURLProperties.get(HttpConstants.PROTOCOL));
        httpCarbonMessage.setProperty(HttpConstants.TO, this.httpURLProperties.get(HttpConstants.TO));
        httpCarbonMessage.setProperty("host", this.httpURLProperties.get("host"));
        httpCarbonMessage.setProperty("port", Integer.valueOf(this.httpURLProperties.get("port")));
        httpCarbonMessage.setProperty(HttpConstants.HTTP_METHOD, str2);
        httpCarbonMessage.setProperty("REQUEST_URL", this.httpURLProperties.get("REQUEST_URL"));
        HttpHeaders headers = httpCarbonMessage.getHeaders();
        if (!this.userName.equals(HttpConstants.EMPTY_STRING) && !this.userPassword.equals(HttpConstants.EMPTY_STRING)) {
            headers.set(HttpConstants.AUTHORIZATION_HEADER, this.authorizationHeader);
        } else if (!this.userName.equals(HttpConstants.EMPTY_STRING) || !this.userPassword.equals(HttpConstants.EMPTY_STRING)) {
            log.error("One of the basic authentication username or password missing. Hence basic authentication not supported.");
        }
        headers.set("host", httpCarbonMessage.getProperty("host"));
        if (list != null) {
            for (Header header : list) {
                headers.set(header.getName(), header.getValue());
            }
        }
        if (str.contains(this.mapType)) {
            headers.set(HttpConstants.HTTP_CONTENT_TYPE, str);
        }
        headers.set(HttpConstants.HTTP_METHOD, str2);
        return httpCarbonMessage;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getMessageBody(Object obj) {
        return HttpConstants.MAP_KEYVALUE.equals(this.mapType) ? (String) ((HashMap) obj).entrySet().stream().map(entry -> {
            return encodeMessage(entry.getKey()) + "=" + encodeMessage(entry.getValue());
        }).reduce(HttpConstants.EMPTY_STRING, (str, str2) -> {
            return str + "&" + str2;
        }) : (String) obj;
    }

    void initConnectorFactory() {
        if (HttpConstants.EMPTY_STRING.equals(this.bootstrapBoss) || HttpConstants.EMPTY_STRING.equals(this.bootstrapWorker)) {
            this.httpConnectorFactory = new DefaultHttpWsConnectorFactory();
        } else if (HttpConstants.EMPTY_STRING.equals(this.bootstrapClient)) {
            this.httpConnectorFactory = new DefaultHttpWsConnectorFactory(Integer.parseInt(this.bootstrapBoss), Integer.parseInt(this.bootstrapWorker), Integer.parseInt(this.bootstrapWorker));
        } else {
            this.httpConnectorFactory = new DefaultHttpWsConnectorFactory(Integer.parseInt(this.bootstrapBoss), Integer.parseInt(this.bootstrapWorker), Integer.parseInt(this.bootstrapClient));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initClientConnector(DynamicOptions dynamicOptions) {
        if (this.publisherURLOption.isStatic()) {
            this.publisherURL = this.publisherURLOption.getValue();
        } else {
            this.publisherURL = this.publisherURLOption.getValue(dynamicOptions);
        }
        if (this.authType.equals(HttpConstants.OAUTH)) {
            if (HttpConstants.EMPTY_STRING.equals(this.consumerSecret) || HttpConstants.EMPTY_STRING.equals(this.consumerKey)) {
                throw new SiddhiAppCreationException("consumer.key and consumer.secret found empty but it is Mandatory field in http sink in " + this.streamID);
            }
            if (HttpConstants.EMPTY_STRING.equals(this.tokenURL)) {
                throw new SiddhiAppCreationException("token.url found empty but it is Mandatory field in http sink in " + this.streamID);
            }
        }
        String scheme = HttpSinkUtil.getScheme(this.publisherURL);
        this.httpURLProperties = HttpSinkUtil.getURLProperties(this.publisherURL);
        SenderConfiguration senderConfigurations = HttpSinkUtil.getSenderConfigurations(this.httpURLProperties, this.clientStoreFile, this.clientStorePass, this.configReader);
        if (HttpConstants.EMPTY_STRING.equals(this.publisherURL)) {
            throw new SiddhiAppCreationException("Receiver URL found empty but it is Mandatory field in http sink in " + this.streamID);
        }
        if (HttpConstants.SCHEME_HTTPS.equals(scheme) && (this.clientStoreFile == null || this.clientStorePass == null)) {
            throw new ExceptionInInitializerError("Client trustStore file path or password are empty while default scheme is 'https'. Please provide client trustStore file path and password in " + this.streamID);
        }
        if (HttpConstants.EMPTY_STRING.equals(this.userName) ^ HttpConstants.EMPTY_STRING.equals(this.userPassword)) {
            throw new SiddhiAppCreationException("Please provide user name and password in http sink with the stream " + this.streamID + " in Siddhi app " + this.siddhiAppContext.getName());
        }
        if (!HttpConstants.EMPTY_STRING.equals(this.userName) && !HttpConstants.EMPTY_STRING.equals(this.userPassword)) {
            this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + Base64.encode(Unpooled.copiedBuffer((this.userName + ":" + this.userPassword).getBytes(Charset.defaultCharset())));
        }
        if (!HttpConstants.EMPTY_STRING.equals(this.proxyHost) && !HttpConstants.EMPTY_STRING.equals(this.proxyPort)) {
            try {
                ProxyServerConfiguration proxyServerConfiguration = new ProxyServerConfiguration(this.proxyHost, Integer.parseInt(this.proxyPort));
                if (!HttpConstants.EMPTY_STRING.equals(this.proxyPassword) && !HttpConstants.EMPTY_STRING.equals(this.proxyUsername)) {
                    proxyServerConfiguration.setProxyPassword(this.proxyPassword);
                    proxyServerConfiguration.setProxyUsername(this.proxyUsername);
                }
                senderConfigurations.setProxyServerConfiguration(proxyServerConfiguration);
            } catch (UnknownHostException e) {
                log.error("Proxy url and password is invalid in sink " + this.streamID + " Siddhi app " + this.siddhiAppContext.getName(), e);
            }
        }
        if (this.socketIdleTimeout != -1) {
            senderConfigurations.setSocketIdleTimeout(this.socketIdleTimeout);
        }
        if (!HttpConstants.EMPTY_STRING.equals(this.sslProtocol)) {
            senderConfigurations.setSSLProtocol(this.sslProtocol);
        }
        if (!HttpConstants.EMPTY_STRING.equals(this.tlsStoreType)) {
            senderConfigurations.setTLSStoreType(this.tlsStoreType);
        }
        if (!HttpConstants.EMPTY_STRING.equals(this.chunkDisabled) && this.chunkDisabled != null) {
            if (Boolean.parseBoolean(this.chunkDisabled)) {
                senderConfigurations.setChunkingConfig(ChunkConfig.NEVER);
            } else {
                senderConfigurations.setChunkingConfig(ChunkConfig.ALWAYS);
            }
        }
        if (!HttpConstants.EMPTY_STRING.equals(this.parametersList)) {
            senderConfigurations.setParameters(HttpIoUtil.populateParameters(this.parametersList));
        }
        this.clientConnector = this.httpConnectorFactory.createHttpClientConnector(HttpSinkUtil.populateTransportConfiguration(this.clientBootstrapConfiguration, this.clientPoolConfiguration), senderConfigurations);
    }

    private String encodeMessage(Object obj) {
        try {
            return URLEncoder.encode((String) obj, HttpConstants.DEFAULT_ENCODING);
        } catch (UnsupportedEncodingException e) {
            throw new SiddhiAppRuntimeException("Execution of Siddhi app " + this.siddhiAppContext.getName() + " failed due to " + e.getMessage(), e);
        }
    }

    private String encodeBase64(String str) {
        return Base64.encode(Unpooled.wrappedBuffer(str.getBytes(StandardCharsets.UTF_8))).toString(StandardCharsets.UTF_8);
    }
}
