package com.github.castorm.kafka.connect.http;

import com.github.castorm.kafka.connect.common.VersionUtils;
import com.github.castorm.kafka.connect.http.client.spi.HttpClient;
import com.github.castorm.kafka.connect.http.model.HttpRequest;
import com.github.castorm.kafka.connect.http.model.HttpResponse;
import com.github.castorm.kafka.connect.http.model.Offset;
import com.github.castorm.kafka.connect.http.record.spi.SourceRecordFilterFactory;
import com.github.castorm.kafka.connect.http.record.spi.SourceRecordSorter;
import com.github.castorm.kafka.connect.http.request.spi.HttpRequestFactory;
import com.github.castorm.kafka.connect.http.response.spi.HttpResponseParser;
import com.github.castorm.kafka.connect.timer.TimerThrottler;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/castorm/kafka/connect/http/HttpSourceTask.class */
public class HttpSourceTask extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(HttpSourceTask.class);
    private final Function<Map<String, String>, HttpSourceConnectorConfig> configFactory;
    private TimerThrottler throttler;
    private HttpRequestFactory requestFactory;
    private HttpClient requestExecutor;
    private HttpResponseParser responseParser;
    private SourceRecordSorter recordSorter;
    private SourceRecordFilterFactory recordFilterFactory;
    private Offset offset;

    HttpSourceTask(Function<Map<String, String>, HttpSourceConnectorConfig> function) {
        this.configFactory = function;
    }

    public HttpSourceTask() {
        this(HttpSourceConnectorConfig::new);
    }

    public void start(Map<String, String> map) {
        HttpSourceConnectorConfig apply = this.configFactory.apply(map);
        this.throttler = apply.getThrottler();
        this.requestFactory = apply.getRequestFactory();
        this.requestExecutor = apply.getClient();
        this.responseParser = apply.getResponseParser();
        this.recordSorter = apply.getRecordSorter();
        this.recordFilterFactory = apply.getRecordFilterFactory();
        Map<String, String> map2 = (Map) Optional.ofNullable(this.context.offsetStorageReader().offset(Collections.emptyMap())).orElseGet(edu.emory.mathcs.backport.java.util.Collections::emptyMap);
        this.offset = Offset.of(!map2.isEmpty() ? map2 : apply.getInitialOffset());
    }

    public List<SourceRecord> poll() throws InterruptedException {
        this.throttler.throttle(this.offset.getTimestamp().orElseGet(Instant::now));
        List<SourceRecord> parse = this.responseParser.parse(execute(this.requestFactory.createRequest(this.offset)));
        return log(parse, (List) this.recordSorter.sort(parse).stream().filter(this.recordFilterFactory.create(this.offset)).collect(Collectors.toList()));
    }

    private List<SourceRecord> log(List<SourceRecord> list, List<SourceRecord> list2) {
        log.info("Request for offset {} yields {}/{} new records", new Object[]{this.offset.toMap(), Integer.valueOf(list2.size()), Integer.valueOf(list.size())});
        return list2;
    }

    private HttpResponse execute(HttpRequest httpRequest) {
        try {
            return this.requestExecutor.execute(httpRequest);
        } catch (IOException e) {
            throw new RetriableException(e);
        }
    }

    public void commitRecord(SourceRecord sourceRecord) {
        this.offset = Offset.of(sourceRecord.sourceOffset());
    }

    public void stop() {
    }

    public String version() {
        return VersionUtils.getVersion();
    }

    public Offset getOffset() {
        return this.offset;
    }
}
