package io.aiven.kafka.connect.s3;

import com.amazonaws.PredefinedClientConfigurations;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.retry.PredefinedBackoffStrategies;
import com.amazonaws.retry.PredefinedRetryPolicies;
import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import io.aiven.kafka.connect.common.config.FilenameTemplateVariable;
import io.aiven.kafka.connect.common.grouper.RecordGrouper;
import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory;
import io.aiven.kafka.connect.common.output.OutputWriter;
import io.aiven.kafka.connect.common.templating.Template;
import io.aiven.kafka.connect.common.templating.VariableTemplatePart;
import io.aiven.kafka.connect.s3.config.AwsCredentialProviderFactory;
import io.aiven.kafka.connect.s3.config.S3SinkConfig;
import java.io.IOException;
import java.io.OutputStream;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/aiven/kafka/connect/s3/S3SinkTask.class */
public class S3SinkTask extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(AivenKafkaConnectS3SinkConnector.class);
    private RecordGrouper recordGrouper;
    private S3SinkConfig config;
    private AmazonS3 s3Client;
    protected AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory();

    public void start(Map<String, String> map) {
        Objects.requireNonNull(map, "props hasn't been set");
        this.config = new S3SinkConfig(map);
        this.s3Client = createAmazonS3Client(this.config);
        try {
            this.recordGrouper = RecordGrouperFactory.newRecordGrouper(this.config);
            if (Objects.nonNull(this.config.getKafkaRetryBackoffMs())) {
                this.context.timeout(this.config.getKafkaRetryBackoffMs().longValue());
            }
        } catch (Exception e) {
            throw new ConnectException("Unsupported file name template " + this.config.getFilename(), e);
        }
    }

    private AmazonS3 createAmazonS3Client(S3SinkConfig s3SinkConfig) {
        AwsClientBuilder.EndpointConfiguration newEndpointConfiguration = newEndpointConfiguration(this.config);
        AmazonS3ClientBuilder withClientConfiguration = AmazonS3ClientBuilder.standard().withCredentials(this.credentialFactory.getProvider(s3SinkConfig)).withClientConfiguration(PredefinedClientConfigurations.defaultConfig().withRetryPolicy(new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION, new PredefinedBackoffStrategies.FullJitterBackoffStrategy(Math.toIntExact(s3SinkConfig.getS3RetryBackoffDelayMs()), Math.toIntExact(s3SinkConfig.getS3RetryBackoffMaxDelayMs())), s3SinkConfig.getS3RetryBackoffMaxRetries(), false)));
        if (Objects.isNull(newEndpointConfiguration)) {
            withClientConfiguration.withRegion(s3SinkConfig.getAwsS3Region());
        } else {
            withClientConfiguration.withEndpointConfiguration(newEndpointConfiguration).withPathStyleAccessEnabled(true);
        }
        return (AmazonS3) withClientConfiguration.build();
    }

    public void put(Collection<SinkRecord> collection) throws ConnectException {
        Objects.requireNonNull(collection, "records cannot be null");
        LOGGER.info("Processing {} records", Integer.valueOf(collection.size()));
        RecordGrouper recordGrouper = this.recordGrouper;
        Objects.requireNonNull(recordGrouper);
        collection.forEach(recordGrouper::put);
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            this.recordGrouper.records().forEach(this::flushFile);
        } finally {
            this.recordGrouper.clear();
        }
    }

    private void flushFile(String str, List<SinkRecord> list) {
        Objects.requireNonNull(list, "records cannot be null");
        if (list.isEmpty()) {
            return;
        }
        try {
            OutputStream newStreamFor = newStreamFor(str, list.get(0));
            try {
                OutputWriter build = OutputWriter.builder().withCompressionType(this.config.getCompressionType()).withExternalProperties(this.config.originalsStrings()).withOutputFields(this.config.getOutputFields()).withEnvelopeEnabled(this.config.envelopeEnabled()).build(newStreamFor, this.config.getFormatType());
                try {
                    build.writeRecords(list);
                    if (build != null) {
                        build.close();
                    }
                    if (newStreamFor != null) {
                        newStreamFor.close();
                    }
                } catch (Throwable th) {
                    if (build != null) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e) {
            throw new ConnectException(e);
        }
    }

    public void stop() {
        this.s3Client.shutdown();
        LOGGER.info("Stop S3 Sink Task");
    }

    public String version() {
        return Version.VERSION;
    }

    private OutputStream newStreamFor(String str, SinkRecord sinkRecord) {
        return new S3OutputStream(this.config.getAwsS3BucketName(), this.config.usesFileNameTemplate().booleanValue() ? str : oldFullKey(sinkRecord), this.config.getAwsS3PartSize(), this.s3Client);
    }

    private AwsClientBuilder.EndpointConfiguration newEndpointConfiguration(S3SinkConfig s3SinkConfig) {
        if (Objects.isNull(s3SinkConfig.getAwsS3EndPoint())) {
            return null;
        }
        return new AwsClientBuilder.EndpointConfiguration(s3SinkConfig.getAwsS3EndPoint(), s3SinkConfig.getAwsS3Region().getName());
    }

    private String oldFullKey(SinkRecord sinkRecord) {
        Template.Instance bindVariable = this.config.getPrefixTemplate().instance().bindVariable(FilenameTemplateVariable.TIMESTAMP.name, parameter -> {
            return OldFullKeyFormatters.timestamp(sinkRecord, this.config.getTimestampSource(), parameter);
        }).bindVariable(FilenameTemplateVariable.PARTITION.name, () -> {
            return sinkRecord.kafkaPartition().toString();
        }).bindVariable(FilenameTemplateVariable.START_OFFSET.name, parameter2 -> {
            return OldFullKeyFormatters.KAFKA_OFFSET.apply(sinkRecord, parameter2);
        });
        String str = FilenameTemplateVariable.TOPIC.name;
        Objects.requireNonNull(sinkRecord);
        return bindVariable.bindVariable(str, sinkRecord::topic).bindVariable("utc_date", () -> {
            return ZonedDateTime.now(ZoneId.of("UTC")).format(DateTimeFormatter.ISO_LOCAL_DATE);
        }).bindVariable("local_date", () -> {
            return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE);
        }).render() + String.format("%s-%s-%s", sinkRecord.topic(), sinkRecord.kafkaPartition(), OldFullKeyFormatters.KAFKA_OFFSET.apply(sinkRecord, VariableTemplatePart.Parameter.of("padding", "true"))) + this.config.getCompressionType().extension();
    }
}
