package io.debezium.server.http;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.http.jwt.JWTAuthenticatorBuilder;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Named;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("http")
@Dependent
/* loaded from: input_file:io/debezium/server/http/HttpChangeConsumer.class */
public class HttpChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    public static final String PROP_PREFIX = "debezium.sink.http.";
    public static final String PROP_WEBHOOK_URL = "url";
    public static final String PROP_CLIENT_TIMEOUT = "timeout.ms";
    public static final String PROP_RETRIES = "retries";
    public static final String PROP_RETRY_INTERVAL = "retry.interval.ms";
    public static final String PROP_HEADERS_ENCODE_BASE64 = "headers.encode.base64";
    public static final String PROP_HEADERS_PREFIX = "headers.prefix";
    public static final String PROP_AUTHENTICATION_PREFIX = "debezium.sink.http.authentication.";
    public static final String PROP_AUTHENTICATION_TYPE = "type";
    public static final String JWT_AUTHENTICATION = "jwt";
    private static final int DEFAULT_RETRIES = 5;
    private static final String DEFAULT_HEADERS_PREFIX = "X-DEBEZIUM-";
    private static Duration timeoutDuration;
    private static int retries;
    private static Duration retryInterval;
    private boolean base64EncodeHeaders = true;
    private String headersPrefix = DEFAULT_HEADERS_PREFIX;
    private HttpClient client;
    private HttpRequest.Builder requestBuilder;
    private Authenticator authenticator;
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpChangeConsumer.class);
    private static final Long HTTP_TIMEOUT = Long.valueOf(Integer.toUnsignedLong(60000));
    private static final Long RETRY_INTERVAL = Long.valueOf(Integer.toUnsignedLong(1000));

    @PostConstruct
    void connect() throws URISyntaxException {
        initWithConfig(ConfigProvider.getConfig());
    }

    void initWithConfig(Config config) throws URISyntaxException {
        String str;
        this.client = HttpClient.newHttpClient();
        String str2 = System.getenv("K_SINK");
        timeoutDuration = Duration.ofMillis(HTTP_TIMEOUT.longValue());
        retries = DEFAULT_RETRIES;
        retryInterval = Duration.ofMillis(RETRY_INTERVAL.longValue());
        String str3 = str2 != null ? str2 : (String) config.getValue("debezium.sink.http.url", String.class);
        config.getOptionalValue("debezium.sink.http.timeout.ms", String.class).ifPresent(str4 -> {
            timeoutDuration = Duration.ofMillis(Long.parseLong(str4));
        });
        config.getOptionalValue("debezium.sink.http.retries", String.class).ifPresent(str5 -> {
            retries = Integer.parseInt(str5);
        });
        config.getOptionalValue("debezium.sink.http.retry.interval.ms", String.class).ifPresent(str6 -> {
            retryInterval = Duration.ofMillis(Long.parseLong(str6));
        });
        config.getOptionalValue("debezium.sink.http.headers.prefix", String.class).ifPresent(str7 -> {
            this.headersPrefix = str7;
        });
        config.getOptionalValue("debezium.sink.http.headers.encode.base64", Boolean.class).ifPresent(bool -> {
            this.base64EncodeHeaders = bool.booleanValue();
        });
        String str8 = (String) config.getValue("debezium.format.value", String.class);
        boolean z = -1;
        switch (str8.hashCode()) {
            case -1007058034:
                if (str8.equals("cloudevents")) {
                    z = true;
                    break;
                }
                break;
            case 3006770:
                if (str8.equals("avro")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                str = "avro/bytes";
                break;
            case true:
                str = "application/cloudevents+json";
                break;
            default:
                str = "application/json";
                break;
        }
        Optional optionalValue = config.getOptionalValue("debezium.sink.http.authentication.type", String.class);
        if (optionalValue.isPresent()) {
            String str9 = (String) optionalValue.get();
            if (!str9.equalsIgnoreCase(JWT_AUTHENTICATION)) {
                throw new DebeziumException("Unknown value '" + str9 + "' encountered for property debezium.sink.http.authentication.type");
            }
            this.authenticator = JWTAuthenticatorBuilder.fromConfig(config, PROP_AUTHENTICATION_PREFIX).build();
        }
        LOGGER.info("Using http content-type type {}", str);
        LOGGER.info("Using sink URL: {}", str3);
        this.requestBuilder = HttpRequest.newBuilder(new URI(str3)).timeout(timeoutDuration);
        this.requestBuilder.setHeader("content-type", str);
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        for (ChangeEvent<Object, Object> changeEvent : list) {
            LOGGER.trace("Received event '{}'", changeEvent);
            if (changeEvent.value() != null) {
                int i = 0;
                while (!recordSent(changeEvent)) {
                    i++;
                    if (i >= retries) {
                        throw new DebeziumException("Exceeded maximum number of attempts to publish event " + changeEvent);
                    }
                    Metronome.sleeper(retryInterval, Clock.SYSTEM).pause();
                }
                recordCommitter.markProcessed(changeEvent);
            }
        }
        recordCommitter.markBatchFinished();
    }

    private boolean recordSent(ChangeEvent<Object, Object> changeEvent) throws InterruptedException {
        boolean z = false;
        HttpRequest.Builder generateRequest = generateRequest(changeEvent);
        try {
            if (this.authenticator != null) {
                if (!this.authenticator.authenticate()) {
                    throw new DebeziumException("Failed to authenticate successfully.  Cannot continue.");
                }
                this.authenticator.setAuthorizationHeader(generateRequest);
            }
            HttpResponse send = this.client.send(generateRequest.build(), HttpResponse.BodyHandlers.ofString());
            if (send.statusCode() == 200 || send.statusCode() == 204 || send.statusCode() == 202) {
                z = true;
            } else {
                LOGGER.info("Failed to publish event: " + ((String) send.body()));
            }
            return z;
        } catch (IOException e) {
            if (!e.getMessage().contains("GOAWAY")) {
                throw new InterruptedException(e.toString());
            }
            LOGGER.info("HTTP/2 GOAWAY received: {}", e.getMessage());
            return false;
        }
    }

    HttpRequest.Builder generateRequest(ChangeEvent<Object, Object> changeEvent) {
        HttpRequest.Builder POST = this.requestBuilder.POST(HttpRequest.BodyPublishers.ofString((String) changeEvent.value()));
        for (Map.Entry entry : convertHeaders(changeEvent).entrySet()) {
            String str = (String) entry.getValue();
            if (this.base64EncodeHeaders) {
                str = Base64.getEncoder().encodeToString(str.getBytes(StandardCharsets.UTF_8));
            }
            POST.header(this.headersPrefix + ((String) entry.getKey()).toUpperCase(Locale.ROOT), str);
        }
        return POST;
    }
}
