package org.apache.seatunnel.connectors.seatunnel.http.source;

import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Objects;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.class */
public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(HttpSourceReader.class);
    protected final SingleSplitReaderContext context;
    protected final HttpParameter httpParameter;
    protected HttpClientProvider httpClient;
    private final DeserializationCollector deserializationCollector;

    public HttpSourceReader(HttpParameter httpParameter, SingleSplitReaderContext singleSplitReaderContext, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
        this.context = singleSplitReaderContext;
        this.httpParameter = httpParameter;
        this.deserializationCollector = new DeserializationCollector(deserializationSchema);
    }

    public void open() {
        this.httpClient = new HttpClientProvider(this.httpParameter);
    }

    public void close() throws IOException {
        if (Objects.nonNull(this.httpClient)) {
            this.httpClient.close();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        try {
            try {
                HttpResponse execute = this.httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams());
                if (200 != execute.getCode()) {
                    log.error("http client execute exception, http response status code:[{}], content:[{}]", Integer.valueOf(execute.getCode()), execute.getContent());
                    if (Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
                        log.info("Closed the bounded http source");
                        this.context.signalNoMoreElement();
                        return;
                    } else {
                        if (this.httpParameter.getPollIntervalMillis() > 0) {
                            Thread.sleep(this.httpParameter.getPollIntervalMillis());
                            return;
                        }
                        return;
                    }
                }
                String content = execute.getContent();
                if (!Strings.isNullOrEmpty(content)) {
                    this.deserializationCollector.collect(content.getBytes(), collector);
                }
                if (Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
                    log.info("Closed the bounded http source");
                    this.context.signalNoMoreElement();
                } else if (this.httpParameter.getPollIntervalMillis() > 0) {
                    Thread.sleep(this.httpParameter.getPollIntervalMillis());
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                if (Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
                    log.info("Closed the bounded http source");
                    this.context.signalNoMoreElement();
                } else if (this.httpParameter.getPollIntervalMillis() > 0) {
                    Thread.sleep(this.httpParameter.getPollIntervalMillis());
                }
            }
        } catch (Throwable th) {
            if (Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
                log.info("Closed the bounded http source");
                this.context.signalNoMoreElement();
            } else if (this.httpParameter.getPollIntervalMillis() > 0) {
                Thread.sleep(this.httpParameter.getPollIntervalMillis());
            }
            throw th;
        }
    }
}
