package org.apache.seatunnel.connectors.seatunnel.http.source;

import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.Predicate;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
import org.apache.seatunnel.shade.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.class */
public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
    protected final SingleSplitReaderContext context;
    protected final HttpParameter httpParameter;
    protected HttpClientProvider httpClient;
    private final DeserializationCollector deserializationCollector;
    private JsonPath[] jsonPaths;
    private final JsonField jsonField;
    private final String contentJson;
    private final Configuration jsonConfiguration;
    private boolean noMoreElementFlag;
    private Optional<PageInfo> pageInfoOptional;
    private static final Logger log = LoggerFactory.getLogger(HttpSourceReader.class);
    private static final Option[] DEFAULT_OPTIONS = {Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL};

    public HttpSourceReader(HttpParameter httpParameter, SingleSplitReaderContext singleSplitReaderContext, DeserializationSchema<SeaTunnelRow> deserializationSchema, JsonField jsonField, String str) {
        this.jsonConfiguration = Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);
        this.noMoreElementFlag = true;
        this.pageInfoOptional = Optional.empty();
        this.context = singleSplitReaderContext;
        this.httpParameter = httpParameter;
        this.deserializationCollector = new DeserializationCollector(deserializationSchema);
        this.jsonField = jsonField;
        this.contentJson = str;
    }

    public HttpSourceReader(HttpParameter httpParameter, SingleSplitReaderContext singleSplitReaderContext, DeserializationSchema<SeaTunnelRow> deserializationSchema, JsonField jsonField, String str, PageInfo pageInfo) {
        this.jsonConfiguration = Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);
        this.noMoreElementFlag = true;
        this.pageInfoOptional = Optional.empty();
        this.context = singleSplitReaderContext;
        this.httpParameter = httpParameter;
        this.deserializationCollector = new DeserializationCollector(deserializationSchema);
        this.jsonField = jsonField;
        this.contentJson = str;
        this.pageInfoOptional = Optional.ofNullable(pageInfo);
    }

    public void open() {
        this.httpClient = new HttpClientProvider(this.httpParameter);
    }

    public void close() throws IOException {
        if (Objects.nonNull(this.httpClient)) {
            this.httpClient.close();
        }
    }

    public void pollAndCollectData(Collector<SeaTunnelRow> collector) throws Exception {
        HttpResponse execute = this.httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod().getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams(), this.httpParameter.getBody());
        if (execute.getCode() < 200 || execute.getCode() > 207) {
            throw new HttpConnectorException(HttpConnectorErrorCode.REQUEST_FAILED, String.format("http client execute exception, http response status code:[%s], content:[%s]", Integer.valueOf(execute.getCode()), execute.getContent()));
        }
        String content = execute.getContent();
        if (!Strings.isNullOrEmpty(content)) {
            if (this.httpParameter.isEnableMultilines()) {
                BufferedReader bufferedReader = new BufferedReader(new StringReader(content));
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    } else {
                        collect(collector, readLine);
                    }
                }
            } else {
                collect(collector, content);
            }
        }
        log.debug("http client execute success request param:[{}], http response status code:[{}], content:[{}]", new Object[]{this.httpParameter.getParams(), Integer.valueOf(execute.getCode()), execute.getContent()});
    }

    private void updateRequestParam(PageInfo pageInfo) {
        if (this.httpParameter.getParams() == null) {
            this.httpParameter.setParams(new HashMap());
        }
        this.httpParameter.getParams().put(pageInfo.getPageField(), pageInfo.getPageIndex().toString());
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader
    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        synchronized (collector.getCheckpointLock()) {
            internalPollNext(collector);
        }
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader
    public void internalPollNext(Collector<SeaTunnelRow> collector) throws Exception {
        try {
            if (this.pageInfoOptional.isPresent()) {
                this.noMoreElementFlag = false;
                PageInfo pageInfo = this.pageInfoOptional.get();
                Long pageIndex = pageInfo.getPageIndex();
                while (!this.noMoreElementFlag) {
                    pageInfo.setPageIndex(pageIndex);
                    updateRequestParam(pageInfo);
                    pollAndCollectData(collector);
                    pageIndex = Long.valueOf(pageIndex.longValue() + 1);
                    Thread.sleep(10L);
                }
            } else {
                pollAndCollectData(collector);
            }
            if (Boundedness.BOUNDED.equals(this.context.getBoundedness()) && this.noMoreElementFlag) {
                log.info("Closed the bounded http source");
                this.context.signalNoMoreElement();
            } else if (this.httpParameter.getPollIntervalMillis() > 0) {
                Thread.sleep(this.httpParameter.getPollIntervalMillis());
            }
        } catch (Throwable th) {
            if (Boundedness.BOUNDED.equals(this.context.getBoundedness()) && this.noMoreElementFlag) {
                log.info("Closed the bounded http source");
                this.context.signalNoMoreElement();
            } else if (this.httpParameter.getPollIntervalMillis() > 0) {
                Thread.sleep(this.httpParameter.getPollIntervalMillis());
            }
            throw th;
        }
    }

    private void collect(Collector<SeaTunnelRow> collector, String str) throws IOException {
        if (this.contentJson != null) {
            str = JsonUtils.stringToJsonNode(getPartOfJson(str)).toString();
        }
        if (this.jsonField != null && this.contentJson == null) {
            initJsonPath(this.jsonField);
            str = JsonUtils.toJsonNode(parseToMap(decodeJSON(str), this.jsonField)).toString();
        }
        if (this.pageInfoOptional.isPresent()) {
            PageInfo pageInfo = this.pageInfoOptional.get();
            if (pageInfo.getTotalPageSize().longValue() > 0) {
                this.noMoreElementFlag = pageInfo.getPageIndex().longValue() >= pageInfo.getTotalPageSize().longValue();
            } else {
                this.noMoreElementFlag = JsonUtils.stringToJsonNode(str).size() < pageInfo.getBatchSize().intValue();
            }
        }
        this.deserializationCollector.collect(str.getBytes(), collector);
    }

    private List<Map<String, String>> parseToMap(List<List<String>> list, JsonField jsonField) {
        ArrayList arrayList = new ArrayList(list.size());
        String[] strArr = (String[]) jsonField.getFields().keySet().toArray(new String[0]);
        for (List<String> list2 : list) {
            HashMap hashMap = new HashMap(jsonField.getFields().size());
            int[] iArr = {0};
            list2.forEach(str -> {
                hashMap.put(strArr[iArr[0]], str);
                iArr[0] = iArr[0] + 1;
            });
            arrayList.add(hashMap);
        }
        return arrayList;
    }

    private List<List<String>> decodeJSON(String str) {
        DocumentContext parse = JsonPath.using(this.jsonConfiguration).parse(str);
        ArrayList arrayList = new ArrayList(this.jsonPaths.length);
        for (JsonPath jsonPath : this.jsonPaths) {
            arrayList.add((List) parse.read(jsonPath));
        }
        for (int i = 1; i < arrayList.size(); i++) {
            List<String> list = arrayList.get(0);
            List<String> list2 = arrayList.get(i);
            if (list.size() != list2.size()) {
                throw new HttpConnectorException(HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT, String.format("[%s](%d) and [%s](%d) the number of parsing records is inconsistent.", this.jsonPaths[0].getPath(), Integer.valueOf(list.size()), this.jsonPaths[i].getPath(), Integer.valueOf(list2.size())));
            }
        }
        return dataFlip(arrayList);
    }

    private String getPartOfJson(String str) {
        return JsonUtils.toJsonString(JsonPath.using(this.jsonConfiguration).parse(str).read(JsonPath.compile(this.contentJson, new Predicate[0])));
    }

    private List<List<String>> dataFlip(List<List<String>> list) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            List<String> list2 = list.get(i);
            if (i == 0) {
                Iterator<String> it = list2.iterator();
                while (it.hasNext()) {
                    String next = it.next();
                    String obj = next == null ? null : next.toString();
                    ArrayList arrayList2 = new ArrayList(this.jsonPaths.length);
                    arrayList2.add(obj);
                    arrayList.add(arrayList2);
                }
            } else {
                for (int i2 = 0; i2 < list2.size(); i2++) {
                    String str = list2.get(i2);
                    ((List) arrayList.get(i2)).add(str == null ? null : str.toString());
                }
            }
        }
        return arrayList;
    }

    private void initJsonPath(JsonField jsonField) {
        this.jsonPaths = new JsonPath[jsonField.getFields().size()];
        for (int i = 0; i < jsonField.getFields().keySet().size(); i++) {
            this.jsonPaths[i] = JsonPath.compile(((String[]) jsonField.getFields().values().toArray(new String[0]))[i], new Predicate[0]);
        }
    }

    public void setHttpClient(HttpClientProvider httpClientProvider) {
        this.httpClient = httpClientProvider;
    }

    public void setJsonPaths(JsonPath[] jsonPathArr) {
        this.jsonPaths = jsonPathArr;
    }

    public void setNoMoreElementFlag(boolean z) {
        this.noMoreElementFlag = z;
    }

    public void setPageInfoOptional(Optional<PageInfo> optional) {
        this.pageInfoOptional = optional;
    }
}
