package com.github.castorm.kafka.connect.http.response;

import com.github.castorm.kafka.connect.http.model.HttpResponse;
import com.github.castorm.kafka.connect.http.response.spi.HttpResponsePolicy;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/castorm/kafka/connect/http/response/StatusCodeHttpResponsePolicy.class */
public class StatusCodeHttpResponsePolicy implements HttpResponsePolicy {
    private static final Logger log = LoggerFactory.getLogger(StatusCodeHttpResponsePolicy.class);
    private final Function<Map<String, ?>, StatusCodeHttpResponsePolicyConfig> configFactory;
    private Set<Integer> processCodes;
    private Set<Integer> skipCodes;

    public StatusCodeHttpResponsePolicy() {
        this(StatusCodeHttpResponsePolicyConfig::new);
    }

    @Override // com.github.castorm.kafka.connect.http.response.spi.HttpResponsePolicy
    public void configure(Map<String, ?> map) {
        StatusCodeHttpResponsePolicyConfig apply = this.configFactory.apply(map);
        this.processCodes = apply.getProcessCodes();
        this.skipCodes = apply.getSkipCodes();
    }

    @Override // com.github.castorm.kafka.connect.http.response.spi.HttpResponsePolicy
    public HttpResponsePolicy.HttpResponseOutcome resolve(HttpResponse httpResponse) {
        if (this.processCodes.contains(httpResponse.getCode())) {
            return HttpResponsePolicy.HttpResponseOutcome.PROCESS;
        }
        if (!this.skipCodes.contains(httpResponse.getCode())) {
            return HttpResponsePolicy.HttpResponseOutcome.FAIL;
        }
        log.warn("Unexpected HttpResponse status code: {}, continuing with no records", httpResponse.getCode());
        return HttpResponsePolicy.HttpResponseOutcome.SKIP;
    }

    public StatusCodeHttpResponsePolicy(Function<Map<String, ?>, StatusCodeHttpResponsePolicyConfig> function) {
        this.configFactory = function;
    }
}
