package com.expedia.www.haystack.agent.blobs.dispatcher.s3;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.profile.internal.securitytoken.RoleInfo;
import com.amazonaws.auth.profile.internal.securitytoken.STSProfileCredentialsServiceProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.expedia.blobs.core.BlobReadWriteException;
import com.expedia.blobs.core.io.BlobInputStream;
import com.expedia.blobs.core.support.CompressDecompressService;
import com.expedia.www.blobs.model.Blob;
import com.expedia.www.haystack.agent.blobs.dispatcher.core.BlobDispatcher;
import com.expedia.www.haystack.agent.blobs.dispatcher.core.RateLimitException;
import com.expedia.www.haystack.agent.core.metrics.SharedMetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.typesafe.config.Config;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/expedia/www/haystack/agent/blobs/dispatcher/s3/S3Dispatcher.class */
public class S3Dispatcher implements BlobDispatcher, AutoCloseable {
    private static final String BUCKET_NAME_PROPERTY = "bucket.name";
    private static final String REGION_PROPERTY = "region";
    private static final String RETRY_COUNT = "retry.count";
    private static final String AWS_ACCESS_KEY = "aws.access.key";
    private static final String AWS_SECRET_KEY = "aws.secret.key";
    private static final String MAX_CONNECTIONS = "max.connections";
    private static final String KEEP_ALIVE = "keep.alive";
    private static final String AWS_SERVICE_ENDPOINT = "service.endpoint";
    private static final String AWS_PATH_STYLE_ACCESS_ENABLED = "path.style.access.enabled";
    private static final String AWS_DISABLE_CHUNKED_ENCODING = "disable.chunked.encoding";
    private static final String AWS_USE_STS_ARN = "use.sts.arn";
    private static final String AWS_STS_ARN_ROLE = "sts.arn.role";
    private static final String SHOULD_WAIT_FOR_UPLOAD = "should.wait.for.upload";
    private static final String MAX_OUTSTANDING_REQUESTS = "max.outstanding.requests";
    private static final String COMPRESSION_TYPE_CONFIG_KEY = "compression.type";
    private static final String COMPRESSION_TYPE_META_KEY = "compressionType";
    private TransferManager transferManager;
    private String bucketName;
    private Boolean shouldWaitForUpload;
    private int maxOutstandingRequests;
    private Semaphore parallelUploadSemaphore;
    private Executor executor;
    private CompressDecompressService compressDecompressService;
    private Meter dispatchFailureMeter;
    private Timer dispatchTimer;
    private static final Logger LOGGER = LoggerFactory.getLogger(S3Dispatcher.class);
    private static final Long MULTIPART_UPLOAD_THRESHOLD = 5242880L;

    @VisibleForTesting
    S3Dispatcher(TransferManager transferManager, String str, Boolean bool, int i, CompressDecompressService compressDecompressService) {
        this.transferManager = transferManager;
        this.bucketName = str;
        this.shouldWaitForUpload = bool;
        this.maxOutstandingRequests = i;
        this.parallelUploadSemaphore = new Semaphore(i);
        this.executor = MoreExecutors.directExecutor();
        this.dispatchFailureMeter = SharedMetricRegistry.newMeter("s3.dispatch.failure");
        this.dispatchTimer = SharedMetricRegistry.newTimer("s3.dispatch.timer");
        this.compressDecompressService = compressDecompressService;
    }

    @VisibleForTesting
    public S3Dispatcher() {
    }

    @Override // com.expedia.www.haystack.agent.blobs.dispatcher.core.BlobDispatcher
    public String getName() {
        return "s3";
    }

    @Override // com.expedia.www.haystack.agent.blobs.dispatcher.core.BlobDispatcher
    public void dispatch(Blob blob) {
        if (!this.parallelUploadSemaphore.tryAcquire()) {
            throw new RateLimitException("RateLimit is hit with outstanding(pending) requests=" + this.maxOutstandingRequests);
        }
        CompletableFuture.runAsync(() -> {
            dispatchInternal(blob);
        }, this.executor).whenComplete((r5, th) -> {
            if (th != null) {
                LOGGER.error(getClass().getSimpleName() + " failed to store blob ", th);
            }
            this.parallelUploadSemaphore.release();
        });
    }

    void dispatchInternal(Blob blob) {
        try {
            ObjectMetadata objectMetadata = new ObjectMetadata();
            Map metadataMap = blob.getMetadataMap();
            objectMetadata.getClass();
            metadataMap.forEach(objectMetadata::addUserMetadata);
            BlobInputStream compressData = this.compressDecompressService.compressData(blob.getContent().toByteArray());
            objectMetadata.setContentLength(compressData.getLength());
            objectMetadata.addUserMetadata(COMPRESSION_TYPE_META_KEY, this.compressDecompressService.getCompressionType());
            Upload upload = this.transferManager.upload(new PutObjectRequest(this.bucketName, blob.getKey(), compressData.getStream(), objectMetadata).withCannedAcl(CannedAccessControlList.BucketOwnerFullControl).withGeneralProgressListener(new UploadProgressListener(LOGGER, blob.getKey(), this.dispatchFailureMeter, this.dispatchTimer.time())));
            if (this.shouldWaitForUpload.booleanValue()) {
                upload.waitForUploadResult();
            }
        } catch (Exception e) {
            throw new BlobReadWriteException(String.format("Unable to upload blob to S3 for  key %s : %s", blob.getKey(), e.getMessage()), e);
        }
    }

    @Override // com.expedia.www.haystack.agent.blobs.dispatcher.core.BlobDispatcher
    public Optional<Blob> read(String str) {
        try {
            S3Object object = this.transferManager.getAmazonS3Client().getObject(this.bucketName, str);
            Map<String, String> userMetadata = object.getObjectMetadata().getUserMetadata();
            Map<String, String> emptyMap = userMetadata == null ? Collections.emptyMap() : userMetadata;
            InputStream uncompressData = CompressDecompressService.uncompressData(getCompressionType(emptyMap), object.getObjectContent());
            Throwable th = null;
            try {
                try {
                    Optional<Blob> of = Optional.of(Blob.newBuilder().setKey(str).putAllMetadata(emptyMap).setContent(ByteString.copyFrom(readInputStream(uncompressData))).build());
                    if (uncompressData != null) {
                        if (0 != 0) {
                            try {
                                uncompressData.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            uncompressData.close();
                        }
                    }
                    return of;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            LOGGER.error("Failed to read the blob from name={}", getName(), e);
            return Optional.empty();
        }
    }

    protected byte[] readInputStream(InputStream inputStream) throws IOException {
        return IOUtils.toByteArray(inputStream);
    }

    @Override // com.expedia.www.haystack.agent.blobs.dispatcher.core.BlobDispatcher
    public void initialize(Config config) {
        Validate.isTrue(config.hasPath(BUCKET_NAME_PROPERTY), "s3 bucket name should be present", new Object[0]);
        this.bucketName = config.getString(BUCKET_NAME_PROPERTY);
        Validate.notEmpty(this.bucketName, "s3 bucket name can't be empty", new Object[0]);
        Validate.isTrue(config.hasPath(MAX_OUTSTANDING_REQUESTS), "number of max parallel uploads should be present", new Object[0]);
        this.maxOutstandingRequests = config.getInt(MAX_OUTSTANDING_REQUESTS);
        Validate.isTrue(this.maxOutstandingRequests > 0, "max parallel uploads has to be greater than 0", new Object[0]);
        this.shouldWaitForUpload = Boolean.valueOf(config.hasPath(SHOULD_WAIT_FOR_UPLOAD) && config.getBoolean(SHOULD_WAIT_FOR_UPLOAD));
        this.parallelUploadSemaphore = new Semaphore(this.maxOutstandingRequests);
        this.transferManager = createTransferManager(config);
        this.executor = MoreExecutors.directExecutor();
        this.compressDecompressService = new CompressDecompressService(findCompressionType(config));
        this.dispatchFailureMeter = SharedMetricRegistry.newMeter("s3.dispatch.failure");
        this.dispatchTimer = SharedMetricRegistry.newTimer("s3.dispatch.timer");
        LOGGER.info("Successfully initialized the S3 dispatcher...");
    }

    private static TransferManager createTransferManager(Config config) {
        Validate.isTrue(config.hasPath(REGION_PROPERTY), "s3 bucket region can't be empty", new Object[0]);
        int i = config.hasPath(MAX_CONNECTIONS) ? config.getInt(MAX_CONNECTIONS) : 50;
        boolean z = config.hasPath(KEEP_ALIVE) && config.getBoolean(KEEP_ALIVE);
        int i2 = config.hasPath(RETRY_COUNT) ? config.getInt(RETRY_COUNT) : -1;
        ClientConfiguration withTcpKeepAlive = new ClientConfiguration().withMaxConnections(i).withTcpKeepAlive(z);
        if (i2 > 0) {
            withTcpKeepAlive.setMaxErrorRetry(i2);
        }
        return TransferManagerBuilder.standard().withS3Client(getS3Client(config, withTcpKeepAlive)).withMultipartUploadThreshold(MULTIPART_UPLOAD_THRESHOLD).build();
    }

    private static AmazonS3 getS3Client(Config config, ClientConfiguration clientConfiguration) {
        String string = config.getString(REGION_PROPERTY);
        boolean z = config.hasPath(AWS_PATH_STYLE_ACCESS_ENABLED) && config.getBoolean(AWS_PATH_STYLE_ACCESS_ENABLED);
        boolean z2 = config.hasPath(AWS_DISABLE_CHUNKED_ENCODING) && config.getBoolean(AWS_DISABLE_CHUNKED_ENCODING);
        AmazonS3ClientBuilder withPathStyleAccessEnabled = AmazonS3ClientBuilder.standard().withCredentials(buildCredentialProvider(config)).withClientConfiguration(clientConfiguration).withPathStyleAccessEnabled(Boolean.valueOf(z));
        if (config.hasPath(AWS_SERVICE_ENDPOINT)) {
            withPathStyleAccessEnabled.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(config.getString(AWS_SERVICE_ENDPOINT), string));
        } else {
            withPathStyleAccessEnabled.withRegion(string);
        }
        if (z2) {
            withPathStyleAccessEnabled.disableChunkedEncoding();
        }
        return (AmazonS3) withPathStyleAccessEnabled.build();
    }

    @VisibleForTesting
    static AWSCredentialsProvider buildCredentialProvider(Config config) {
        if (config.hasPath(AWS_USE_STS_ARN) && config.getBoolean(AWS_USE_STS_ARN)) {
            LOGGER.info("using STS profile credential service provider");
            Validate.isTrue(config.hasPath(AWS_STS_ARN_ROLE), "AWS STS Assume-Role should be present when enabled", new Object[0]);
            return new STSProfileCredentialsServiceProvider(new RoleInfo().withRoleArn(config.getString(AWS_STS_ARN_ROLE)).withRoleSessionName("haystack-monitoring-blobs-agent"));
        }
        if (config.hasPath(AWS_ACCESS_KEY) && config.hasPath(AWS_SECRET_KEY)) {
            LOGGER.info("using static aws credential provider with access and secret key for s3 dispatcher");
            return new AWSStaticCredentialsProvider(new BasicAWSCredentials(config.getString(AWS_ACCESS_KEY), config.getString(AWS_SECRET_KEY)));
        }
        LOGGER.info("using default credential provider chain for s3 dispatcher");
        return DefaultAWSCredentialsProviderChain.getInstance();
    }

    CompressDecompressService.CompressionType findCompressionType(Config config) {
        return CompressDecompressService.CompressionType.valueOf(config.hasPath(COMPRESSION_TYPE_CONFIG_KEY) ? config.getString(COMPRESSION_TYPE_CONFIG_KEY).toUpperCase() : "NONE");
    }

    CompressDecompressService.CompressionType getCompressionType(Map<String, String> map) {
        String str = map.get(COMPRESSION_TYPE_META_KEY);
        return CompressDecompressService.CompressionType.valueOf((StringUtils.isEmpty(str) ? "NONE" : str).toUpperCase());
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.transferManager != null) {
            LOGGER.info("shutting down the s3 dispatcher now..");
            this.transferManager.shutdownNow();
        }
    }
}
