package org.wso2.extension.siddhi.io.http.source;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.extension.siddhi.io.http.source.exception.HttpSourceAdaptorRuntimeException;
import org.wso2.extension.siddhi.io.http.source.util.HttpSourceUtil;
import org.wso2.extension.siddhi.io.http.util.HTTPSourceRegistry;
import org.wso2.extension.siddhi.io.http.util.HttpConstants;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.SystemParameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.transport.http.netty.contract.ServerConnectorException;
import org.wso2.transport.http.netty.message.HTTPCarbonMessage;

@Extension(name = "http-request", namespace = "source", description = "The HTTP source receives POST requests via HTTP or HTTPS in format such as `text`, `XML` and `JSON`. If required, you can enable basic authentication to ensure that events are received only from users who are authorized to access the service.", parameters = {@Parameter(name = HttpConstants.RECEIVER_URL, description = "The URL to which the events should be received. User can provide any valid url and if the url is not provided the system will use the following format `http://0.0.0.0:9763/<appNAme>/<streamName>`If the user want to use SSL the url should be given in following format `https://localhost:8080/<streamName>`", type = {DataType.STRING}, optional = true, defaultValue = "http://0.0.0.0:9763/<appNAme>/<streamName>"), @Parameter(name = HttpConstants.SOURCE_ID, description = "Identifier need to map the source to sink.", type = {DataType.STRING}), @Parameter(name = HttpConstants.CONNECTION_TIMEOUT, description = "Connection timeout in milliseconds. If the mapped http-response sink does not get a correlated message, after this timeout value, a timeout response is sent", type = {DataType.INT}, optional = true, defaultValue = "120000"), @Parameter(name = HttpConstants.IS_AUTH, description = "If this is set to `true`, basic authentication is enabled for incoming events, and the credentials with which each event is sent are verified to ensure that the user is authorized to access the service. If basic authentication fails, the event is not authenticated and an authentication error is logged in the CLI. By default this values 'false' ", type = {DataType.STRING}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.WORKER_COUNT, description = "The number of active worker threads to serve the incoming events. The value is 1 by default. This will ensure that the events are directed to the event stream in the same order in which they arrive. By increasing this value the performance might increase at the cost of loosing event ordering.", type = {DataType.STRING}, optional = true, defaultValue = HttpConstants.DEFAULT_WORKER_COUNT), @Parameter(name = HttpConstants.SOCKET_IDEAL_TIMEOUT, description = "Idle timeout for HTTP connection.", type = {DataType.INT}, optional = true, defaultValue = "120000"), @Parameter(name = HttpConstants.SSL_VERIFY_CLIENT, description = "The type of client certificate verification.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = HttpConstants.SSL_PROTOCOL, description = "ssl/tls related options", type = {DataType.STRING}, optional = true, defaultValue = "TLS"), @Parameter(name = HttpConstants.TLS_STORE_TYPE, description = "TLS store type.", type = {DataType.STRING}, optional = true, defaultValue = "JKS"), @Parameter(name = "parameters", description = "Parameters other than basics such as ciphers,sslEnabledProtocols,client.enable.session.creation. Expected format of these parameters is as follows: \"'ciphers:xxx','sslEnabledProtocols,client.enable:xxx'\"", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "ciphers", description = "List of ciphers to be used. This parameter should include under parameters Ex: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "ssl.enabled.protocols", description = "SSL/TLS protocols to be enabled. This parameter should be in camel case format(sslEnabledProtocols) under parameters. Ex 'sslEnabledProtocols:true'", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "server.enable.session.creation", description = "Enable HTTP session creation.This parameter should include under parameters Ex: 'client.enable.session.creation:true'", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "server.supported.snimatchers", description = "Http SNIMatcher to be added. This parameter should include under parameters Ex: 'server.supported.snimatchers:SNIMatcher'", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "server.suported.server.names", description = "Http supported servers. This parameter should include under parameters Ex: 'server.suported.server.names:server'", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "request.size.validation.configuration", description = "Parameters that responsible for validating the http request and request headers. Expected format of these parameters is as follows: \"'request.size.validation:xxx','request.size.validation.maximum.value:xxx'\"", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = HttpConstants.REQUEST_SIZE_VALIDATION, description = "To enable the request size validation.", type = {DataType.STRING}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.REQUEST_SIZE_VALIDATION_MAXIMUM_VALUE, description = "If request size is validated then maximum size.", type = {DataType.STRING}, optional = true, defaultValue = "Integer.MAX_VALUE"), @Parameter(name = HttpConstants.REQUEST_SIZE_VALIDATION_REJECT_STATUS_CODE, description = "If request is exceed maximum size and request.size.validation is enabled then status code to be send as response.", type = {DataType.STRING}, optional = true, defaultValue = "401"), @Parameter(name = HttpConstants.REQUEST_SIZE_VALIDATION_REJECT_MESSAGE, description = "If request is exceed maximum size and request.size.validation is enabled then status message to be send as response.", type = {DataType.STRING}, optional = true, defaultValue = "Message is bigger than the valid size"), @Parameter(name = HttpConstants.REQUEST_SIZE_VALIDATION_REJECT_MESSAGE_CONTENT_TYPE, description = "If request is exceed maximum size and request.size.validation is enabled then content type to be send as response.", type = {DataType.STRING}, optional = true, defaultValue = "plain/text"), @Parameter(name = HttpConstants.HEADER_SIZE_VALIDATION, description = "To enable the header size validation.", type = {DataType.STRING}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.HEADER_VALIDATION_MAXIMUM_REQUEST_LINE, description = "If header header validation is enabled then the maximum request line.", type = {DataType.STRING}, optional = true, defaultValue = "4096"), @Parameter(name = HttpConstants.HEADER_VALIDATION_MAXIMUM_SIZE, description = "If header header validation is enabled then the maximum expected header size.", type = {DataType.STRING}, optional = true, defaultValue = "8192"), @Parameter(name = HttpConstants.HEADER_VALIDATION_MAXIMUM_CHUNK_SIZE, description = "If header header validation is enabled then the maximum expected chunk size.", type = {DataType.STRING}, optional = true, defaultValue = "8192"), @Parameter(name = HttpConstants.HEADER_VALIDATION_REJECT_STATUS_CODE, description = "401", type = {DataType.STRING}, optional = true, defaultValue = "If header is exceed maximum size and header.size.validation is enabled then status code to be send as response."), @Parameter(name = HttpConstants.HEADER_VALIDATION_REJECT_MESSAGE, description = "If header is exceed maximum size and header.size.validation is enabled then message to be send as response.", type = {DataType.STRING}, optional = true, defaultValue = "Message header is bigger than the valid size"), @Parameter(name = HttpConstants.HEADER_VALIDATION_REJECT_MESSAGE_CONTENT_TYPE, description = "If header is exceed maximum size and header.size.validation is enabled then content type to be send as response.", type = {DataType.STRING}, optional = true, defaultValue = "plain/text"), @Parameter(name = "server.bootstrap.configuration", description = "Parameters that for bootstrap configurations of the server. Expected format of these parameters is as follows: \"'ciphers:xxx','sslEnabledProtocols,client.enable:xxx'\"", type = {DataType.OBJECT}, optional = true, defaultValue = "null"), @Parameter(name = HttpConstants.SERVER_BOOTSTRAP_NODELAY_PARAM, description = "Http server no delay.", type = {DataType.BOOL}, optional = true, defaultValue = "true"), @Parameter(name = HttpConstants.SERVER_BOOTSTRAP_KEEPALIVE_PARAM, description = "Http server keep alive.", type = {DataType.BOOL}, optional = true, defaultValue = "true"), @Parameter(name = HttpConstants.SERVER_BOOTSTRAP_SENDBUFFERSIZE_PARAM, description = "Http server send buffer size.", type = {DataType.INT}, optional = true, defaultValue = "1048576"), @Parameter(name = HttpConstants.SERVER_BOOTSTRAP_RECIEVEBUFFERSIZE_PARAM, description = "Http server receive buffer size.", type = {DataType.INT}, optional = true, defaultValue = "1048576"), @Parameter(name = HttpConstants.SERVER_BOOTSTRAP_CONNECT_TIMEOUT_PARAM, description = "Http server connection timeout.", type = {DataType.INT}, optional = true, defaultValue = "15000"), @Parameter(name = HttpConstants.SERVER_BOOTSTRAP_SOCKET_REUSE_PARAM, description = "To enable http socket reuse.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = HttpConstants.SERVER_BOOTSTRAP_SOCKET_TIMEOUT_PARAM, description = "Http server socket timeout.", type = {DataType.BOOL}, optional = true, defaultValue = "15"), @Parameter(name = HttpConstants.SERVER_BOOTSTRAP_SOCKET_BACKLOG_PARAM, description = "THttp server socket backlog.", type = {DataType.BOOL}, optional = true, defaultValue = "100"), @Parameter(name = HttpConstants.TRACE_LOG_ENABLED, description = "Http traffic monitoring.", defaultValue = "false", optional = true, type = {DataType.BOOL})}, examples = {@Example(syntax = "@source(type='http-sync', source.id='samplesourceid, receiver.url='http://localhost:9055/endpoints/RecPro', socketIdleTimeout='150000', parameters=\"'ciphers : TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256', 'sslEnabledProtocols:TLSv1.1,TLSv1.2'\",request.size.validation.configuration=\"request.size.validation:true\",server.bootstrap.configuration=\"server.bootstrap.socket.timeout:25\" @map(type='json @attributes(messageId='trp:messageId',symbol='$.events.event.symbol',price='$.events.event.price',volume='$.events.event.volume')))\ndefine stream FooStream (messageId string, symbol string, price float, volume long);\n", description = "The expected input is as follows:{\"events\":\n    {\"event\":\n        \"symbol\":WSO2,\n        \"price\":55.6,\n        \"volume\":100,\n    }\n}\nIf basic authentication is enabled via the `basic.auth.enabled='true` setting, each input event is also expected to contain the `Authorization:'Basic encodeBase64(username:Password)'` header.")}, systemParameter = {@SystemParameter(name = HttpConstants.SERVER_BOOTSTRAP_BOSS_GROUP_SIZE, description = "property to configure number of boss threads, which accepts incoming connections until the ports are unbound. Once connection accepts successfully, boss thread passes the accepted channel to one of the worker threads.", defaultValue = "Number of available processors", possibleParameters = {"Any integer"}), @SystemParameter(name = HttpConstants.SERVER_BOOTSTRAP_WORKER_GROUP_SIZE, description = "property to configure number of worker threads, which performs non blocking read and write for one or more channels in non-blocking mode.", defaultValue = "(Number of available processors)*2", possibleParameters = {"Any integer"}), @SystemParameter(name = HttpConstants.SERVER_BOOTSTRAP_CLIENT_GROUP_SIZE, description = "property to configure number of client threads, which performs non blocking read and write for one or more channels in non-blocking mode.", defaultValue = "(Number of available processors)*2", possibleParameters = {"Any integer"}), @SystemParameter(name = HttpConstants.DEFAULT_HOST, description = "The default host of the transport.", defaultValue = HttpConstants.DEFAULT_HOST_VALUE, possibleParameters = {"Any valid host"}), @SystemParameter(name = HttpConstants.HTTP_PORT, description = "The default port if the default scheme is 'http'.", defaultValue = HttpConstants.HTTP_PORT_VALUE, possibleParameters = {"Any valid port"}), @SystemParameter(name = HttpConstants.HTTPS_PORT, description = "The default port if the default scheme is 'https'.", defaultValue = HttpConstants.HTTPS_PORT_VALUE, possibleParameters = {"Any valid port"}), @SystemParameter(name = HttpConstants.DEFAULT_SOURCE_SCHEME, description = "The default protocol.", defaultValue = "http", possibleParameters = {"http", HttpConstants.SCHEME_HTTPS}), @SystemParameter(name = HttpConstants.KEYSTORE_FILE, description = "The default keystore file path.", defaultValue = HttpConstants.KEYSTORE_FILE_VALUE, possibleParameters = {"Path to wso2carbon.jks file"}), @SystemParameter(name = HttpConstants.KEYSTORE_PASSWORD, description = "The default keystore password.", defaultValue = "wso2carbon", possibleParameters = {"String of keystore password"}), @SystemParameter(name = HttpConstants.CERT_PASSWORD, description = "The default cert password.", defaultValue = "wso2carbon", possibleParameters = {"String of cert password"})})
/* loaded from: input_file:org/wso2/extension/siddhi/io/http/source/HttpRequestSource.class */
public class HttpRequestSource extends HttpSource {
    private static final Logger log = Logger.getLogger(HttpRequestSource.class);
    private HttpSyncConnectorRegistry httpConnectorRegistry;
    private String sourceId;
    private long connectionTimeout;
    private HashedWheelTimer timer;
    private Map<String, HTTPCarbonMessage> requestContainerMap = new ConcurrentHashMap();
    private WeakHashMap<String, Timeout> schedularMap = new WeakHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/extension/siddhi/io/http/source/HttpRequestSource$TimerTaskImpl.class */
    public class TimerTaskImpl implements TimerTask {
        String messageId;

        TimerTaskImpl(String str) {
            this.messageId = str;
        }

        public void run(Timeout timeout) {
            HttpRequestSource.this.cancelRequest(this.messageId, (HTTPCarbonMessage) HttpRequestSource.this.requestContainerMap.get(this.messageId));
        }
    }

    @Override // org.wso2.extension.siddhi.io.http.source.HttpSource
    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        initSource(sourceEventListener, optionHolder, strArr, configReader, siddhiAppContext);
        initConnectorRegistry(optionHolder, configReader);
        this.timer = new HashedWheelTimer();
    }

    @Override // org.wso2.extension.siddhi.io.http.source.HttpSource
    protected void initSource(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        super.initSource(sourceEventListener, optionHolder, strArr, configReader, siddhiAppContext);
        this.sourceId = optionHolder.validateAndGetStaticValue(HttpConstants.SOURCE_ID);
        this.connectionTimeout = Long.parseLong(optionHolder.validateAndGetStaticValue(HttpConstants.CONNECTION_TIMEOUT, "120000"));
    }

    @Override // org.wso2.extension.siddhi.io.http.source.HttpSource
    protected void initConnectorRegistry(OptionHolder optionHolder, ConfigReader configReader) {
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(HttpConstants.REQUEST_SIZE_VALIDATION_CONFIG, HttpConstants.EMPTY_STRING);
        String validateAndGetStaticValue2 = optionHolder.validateAndGetStaticValue(HttpConstants.SERVER_BOOTSTRAP_CONFIGURATION, HttpConstants.EMPTY_STRING);
        this.httpConnectorRegistry = HttpSyncConnectorRegistry.getInstance();
        this.httpConnectorRegistry.initBootstrapConfigIfFirst(configReader);
        this.httpConnectorRegistry.setTransportConfig(validateAndGetStaticValue2, validateAndGetStaticValue);
    }

    @Override // org.wso2.extension.siddhi.io.http.source.HttpSource
    public Class[] getOutputEventClasses() {
        return new Class[]{String.class};
    }

    @Override // org.wso2.extension.siddhi.io.http.source.HttpSource
    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        this.httpConnectorRegistry.createHttpServerConnector(this.listenerConfiguration);
        this.httpConnectorRegistry.registerSourceListener(this.sourceEventListener, this.listenerUrl, Integer.parseInt(this.workerThread), this.isAuth, this.requestedTransportPropertyNames, this.sourceId);
        HTTPSourceRegistry.registerSource(this.sourceId, this);
    }

    @Override // org.wso2.extension.siddhi.io.http.source.HttpSource
    public void disconnect() {
        this.httpConnectorRegistry.unregisterSourceListener(this.listenerUrl);
        this.httpConnectorRegistry.unregisterServerConnector(this.listenerUrl);
        HTTPSourceRegistry.removeSource(this.sourceId);
        for (Map.Entry<String, HTTPCarbonMessage> entry : this.requestContainerMap.entrySet()) {
            cancelRequest(entry.getKey(), entry.getValue());
        }
    }

    @Override // org.wso2.extension.siddhi.io.http.source.HttpSource
    public void destroy() {
        this.httpConnectorRegistry.clearBootstrapConfigIfLast();
        HTTPSourceRegistry.removeSource(this.sourceId);
        this.timer.stop();
    }

    @Override // org.wso2.extension.siddhi.io.http.source.HttpSource
    public void pause() {
        HttpSyncSourceListener httpSyncSourceListener = this.httpConnectorRegistry.getSyncSourceListenersMap().get(HttpSourceUtil.getSourceListenerKey(this.listenerUrl));
        if (httpSyncSourceListener == null || !httpSyncSourceListener.isRunning()) {
            return;
        }
        httpSyncSourceListener.pause();
    }

    @Override // org.wso2.extension.siddhi.io.http.source.HttpSource
    public void resume() {
        HttpSyncSourceListener httpSyncSourceListener = this.httpConnectorRegistry.getSyncSourceListenersMap().get(HttpSourceUtil.getSourceListenerKey(this.listenerUrl));
        if (httpSyncSourceListener == null || !httpSyncSourceListener.isPaused()) {
            return;
        }
        httpSyncSourceListener.resume();
    }

    public void registerCallback(HTTPCarbonMessage hTTPCarbonMessage, String str) {
        addTimeout(str);
        this.requestContainerMap.put(str, hTTPCarbonMessage);
    }

    public void handleCallback(String str, String str2, List<Header> list, String str3) {
        HTTPCarbonMessage hTTPCarbonMessage = this.requestContainerMap.get(str);
        if (hTTPCarbonMessage == null) {
            log.warn("No source message found for source: " + this.sourceId + " and message: " + str);
            return;
        }
        this.requestContainerMap.remove(str);
        removeTimeout(str);
        handleResponse(hTTPCarbonMessage, 200, str2, list, str3);
    }

    private void addTimeout(String str) {
        this.schedularMap.put(str, this.timer.newTimeout(new TimerTaskImpl(str), this.connectionTimeout, TimeUnit.MILLISECONDS));
    }

    private void removeTimeout(String str) {
        this.schedularMap.get(str).cancel();
    }

    private void handleResponse(HTTPCarbonMessage hTTPCarbonMessage, HTTPCarbonMessage hTTPCarbonMessage2) {
        try {
            hTTPCarbonMessage.respond(hTTPCarbonMessage2);
        } catch (ServerConnectorException e) {
            throw new HttpSourceAdaptorRuntimeException("Error occurred during response", e);
        }
    }

    private void handleResponse(HTTPCarbonMessage hTTPCarbonMessage, Integer num, String str, List<Header> list, String str2) {
        handleResponse(hTTPCarbonMessage, createResponseMessage(str != null ? str : HttpConstants.EMPTY_STRING, num == null ? 500 : num.intValue(), list, str2));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancelRequest(String str, HTTPCarbonMessage hTTPCarbonMessage) {
        this.requestContainerMap.remove(str);
        this.schedularMap.remove(str);
        handleResponse(hTTPCarbonMessage, 504, null, null, null);
    }

    private HTTPCarbonMessage createResponseMessage(String str, int i, List<Header> list, String str2) {
        HTTPCarbonMessage hTTPCarbonMessage = new HTTPCarbonMessage(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK));
        hTTPCarbonMessage.addHttpContent(new DefaultLastHttpContent(Unpooled.wrappedBuffer(str.getBytes(Charset.defaultCharset()))));
        HttpHeaders headers = hTTPCarbonMessage.getHeaders();
        hTTPCarbonMessage.setProperty("HTTP_STATUS_CODE", Integer.valueOf(i));
        hTTPCarbonMessage.setProperty("DIRECTION", "DIRECTION_RESPONSE");
        if (str2 != null) {
            headers.set(HttpConstants.HTTP_CONTENT_TYPE, str2);
        }
        if (list != null) {
            for (Header header : list) {
                headers.set(header.getName(), header.getValue());
            }
        }
        return hTTPCarbonMessage;
    }
}
