package com.amazonaws.samples.connectors.timestream;

import com.amazonaws.samples.connectors.timestream.TimestreamSinkConfig;
import com.amazonaws.samples.connectors.timestream.metrics.CloudWatchEmittedMetricGroupHelper;
import com.amazonaws.samples.connectors.timestream.metrics.MetricsCollector;
import com.amazonaws.samples.connectors.timestream.metrics.TimestreamSinkMetricGroup;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.profiles.ProfileFile;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteAsyncClient;
import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteAsyncClientBuilder;
import software.amazon.awssdk.services.timestreamwrite.model.Record;
import software.amazon.awssdk.services.timestreamwrite.model.WriteRecordsRequest;

/* loaded from: input_file:com/amazonaws/samples/connectors/timestream/TimestreamSinkWriter.class */
public class TimestreamSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> {
    private static final Logger LOG = LoggerFactory.getLogger(TimestreamSinkWriter.class);
    private final BatchConverter batchConverter;
    private final TimestreamWriteAsyncClient client;
    private final WriteRequestFailureHandler failureHandler;
    private final MetricsCollector metricsCollector;

    public TimestreamSinkWriter(ElementConverter<InputT, Record> elementConverter, BatchConverter batchConverter, Sink.InitContext initContext, TimestreamSinkConfig timestreamSinkConfig) {
        super(elementConverter, initContext, timestreamSinkConfig.getMaxBatchSize(), timestreamSinkConfig.getMaxInFlightRequests(), timestreamSinkConfig.getMaxBufferedRequests(), 2147483647L, timestreamSinkConfig.getMaxTimeInBufferMS(), 2147483647L);
        this.batchConverter = batchConverter;
        this.client = openAsyncClient(timestreamSinkConfig);
        this.failureHandler = createFailureHandler(timestreamSinkConfig);
        this.metricsCollector = openMetricCollector(initContext);
    }

    TimestreamSinkMetricGroup createTimestreamSinkMetricGroup(Sink.InitContext initContext) {
        return new TimestreamSinkMetricGroup(CloudWatchEmittedMetricGroupHelper.extendMetricGroup(initContext.metricGroup()));
    }

    protected WriteRequestFailureHandler createFailureHandler(TimestreamSinkConfig timestreamSinkConfig) {
        WriteRequestFailureHandler writeRequestFailureHandler = (WriteRequestFailureHandler) InstantiationUtil.instantiate(timestreamSinkConfig.getFailureHandlerConfig().getFailureHandlerClass(), WriteRequestFailureHandler.class, Thread.currentThread().getContextClassLoader());
        writeRequestFailureHandler.open(getFatalExceptionCons(), timestreamSinkConfig.getFailureHandlerConfig());
        return writeRequestFailureHandler;
    }

    @VisibleForTesting
    protected MetricsCollector openMetricCollector(Sink.InitContext initContext) {
        return new MetricsCollector(createTimestreamSinkMetricGroup(initContext));
    }

    @VisibleForTesting
    protected TimestreamWriteAsyncClient openAsyncClient(TimestreamSinkConfig timestreamSinkConfig) {
        TimestreamWriteAsyncClientBuilder credentialsProvider = TimestreamWriteAsyncClient.builder().overrideConfiguration((ClientOverrideConfiguration) ClientOverrideConfiguration.builder().apiCallAttemptTimeout(timestreamSinkConfig.getWriteClientConfig().getRequestTimeout()).retryPolicy(RetryPolicy.builder().numRetries(Integer.valueOf(timestreamSinkConfig.getWriteClientConfig().getMaxErrorRetry())).build()).build()).httpClient(AwsCrtAsyncHttpClient.builder().maxConcurrency(Integer.valueOf(timestreamSinkConfig.getWriteClientConfig().getMaxConcurrency())).build()).region(Region.of(timestreamSinkConfig.getWriteClientConfig().getRegion())).credentialsProvider(getCredentialProvider(timestreamSinkConfig.getCredentialsProviderType(), timestreamSinkConfig.getCredentialConfig()));
        String endpointOverride = timestreamSinkConfig.getWriteClientConfig().getEndpointOverride();
        if (endpointOverride != null) {
            credentialsProvider = (TimestreamWriteAsyncClientBuilder) credentialsProvider.endpointOverride(parseEndpointOverride(endpointOverride));
        }
        LOG.debug("AmazonTimestreamWriteAsync client constructed.");
        return (TimestreamWriteAsyncClient) credentialsProvider.build();
    }

    private AwsCredentialsProvider getCredentialProvider(TimestreamSinkConfig.CredentialProviderType credentialProviderType, TimestreamSinkConfig.CredentialConfig credentialConfig) {
        switch (credentialProviderType) {
            case ENV_VAR:
                return EnvironmentVariableCredentialsProvider.create();
            case SYS_PROP:
                return SystemPropertyCredentialsProvider.create();
            case PROFILE:
                String profileName = credentialConfig.getProfileName();
                String profileConfigPath = credentialConfig.getProfileConfigPath();
                return profileConfigPath == null ? ProfileCredentialsProvider.create(profileName) : ProfileCredentialsProvider.builder().profileName(profileName).profileFile(ProfileFile.builder().content(Path.of(profileConfigPath, new String[0])).build()).build();
            case AUTO:
                return DefaultCredentialsProvider.create();
            default:
                throw new IllegalArgumentException("Credential provider not supported: " + credentialProviderType);
        }
    }

    private URI parseEndpointOverride(String str) {
        try {
            return new URI(str);
        } catch (URISyntaxException e) {
            throw new RuntimeException("Invalid EndpointOverride Config: " + str);
        }
    }

    protected void submitRequestEntries(List<Record> list, Consumer<List<Record>> consumer) {
        WriteRecordsRequest apply = this.batchConverter.apply(list);
        LOG.debug("Sending WriteRecordsRequest with {} records to Timestream...", Integer.valueOf(apply.records().size()));
        this.metricsCollector.collectPreWriteMetrics(apply);
        try {
            asyncWriteRecords(list, consumer, apply);
        } catch (Exception e) {
            LOG.error("Unexpected exception occurred when sending records to Timestream. Retrying all records.", e);
            this.metricsCollector.collectExceptionMetrics(e);
            consumer.accept(list);
        }
    }

    private void asyncWriteRecords(List<Record> list, Consumer<List<Record>> consumer, WriteRecordsRequest writeRecordsRequest) {
        this.client.writeRecords(writeRecordsRequest).whenComplete((writeRecordsResponse, th) -> {
            if (th == null) {
                LOG.trace("Timestream writeRecordsAsync onSuccess: {} -> {}", writeRecordsRequest, writeRecordsResponse);
                this.metricsCollector.collectSuccessMetrics(writeRecordsRequest);
                consumer.accept(Collections.emptyList());
                return;
            }
            if (th instanceof CompletionException) {
                th = th.getCause();
            }
            if (!(th instanceof Exception)) {
                getFatalExceptionCons().accept(new Exception(th));
                return;
            }
            Exception exc = (Exception) th;
            this.metricsCollector.collectExceptionMetrics(exc);
            this.failureHandler.onWriteError(list, writeRecordsRequest, exc, list2 -> {
                this.metricsCollector.collectRetries(list2);
                consumer.accept(list2);
            }, list3 -> {
                this.metricsCollector.collectDropped(list3, writeRecordsRequest);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getSizeInBytes(Record record) {
        return TimestreamModelUtils.getRecordSizeInBytes(record);
    }

    public List<BufferedRequestState<Record>> snapshotState(long j) {
        try {
            flush(true);
            return super.snapshotState(j);
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while flushing buffer during snapshotState", e);
        }
    }
}
