package com.ibm.fhir.bulkdata.jbatch.export.fast;

import com.ibm.cloud.objectstorage.services.s3.AmazonS3;
import com.ibm.cloud.objectstorage.services.s3.model.PartETag;
import com.ibm.fhir.bulkdata.audit.BulkAuditLogger;
import com.ibm.fhir.bulkdata.common.BulkDataUtils;
import com.ibm.fhir.bulkdata.jbatch.context.BatchContextAdapter;
import com.ibm.fhir.bulkdata.jbatch.export.fast.data.CheckpointUserData;
import com.ibm.fhir.bulkdata.jbatch.export.fast.data.TransientUserData;
import com.ibm.fhir.bulkdata.provider.impl.S3Provider;
import com.ibm.fhir.model.resource.Resource;
import com.ibm.fhir.model.util.ModelSupport;
import com.ibm.fhir.operation.bulkdata.config.ConfigurationAdapter;
import com.ibm.fhir.operation.bulkdata.config.ConfigurationFactory;
import com.ibm.fhir.operation.bulkdata.model.type.BulkDataContext;
import com.ibm.fhir.persistence.FHIRPersistence;
import com.ibm.fhir.persistence.ResourcePayload;
import com.ibm.fhir.persistence.helper.FHIRPersistenceHelper;
import com.ibm.fhir.persistence.helper.FHIRTransactionHelper;
import com.ibm.fhir.persistence.util.InputOutputByteStream;
import com.ibm.fhir.search.date.DateTimeHandler;
import java.io.OutputStream;
import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.batch.api.BatchProperty;
import javax.batch.api.chunk.AbstractItemReader;
import javax.batch.runtime.BatchRuntime;
import javax.batch.runtime.context.JobContext;
import javax.batch.runtime.context.StepContext;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Any;
import javax.inject.Inject;
import javax.ws.rs.core.Response;

@Dependent
/* loaded from: input_file:com/ibm/fhir/bulkdata/jbatch/export/fast/ResourcePayloadReader.class */
public class ResourcePayloadReader extends AbstractItemReader {
    private static final Logger logger = Logger.getLogger(ResourcePayloadReader.class.getName());
    private static final String CLASS = ResourcePayloadReader.class.getName();
    FHIRPersistence fhirPersistence;
    Class<? extends Resource> resourceType;
    String fhirResourceType;
    String cosBucketName;
    String cosBucketPathPrefix;

    @Inject
    StepContext stepCtx;

    @Inject
    JobContext jobContext;
    private Instant lastTimestamp;
    private Instant fromLastModified;
    private Instant toLastModified;
    private static final char NDJSON_LINE_SEPARATOR = '\n';
    private long txEndTime;
    private long currentObjectSize;
    private String uploadId;
    private String currentObjectName;
    private int currentObjectResourceCount;
    private Exception processingException;
    private int resourcesProcessed;

    @Inject
    @BatchProperty(name = "partition.resourcetype")
    @Any
    private String resourceTypeStr;
    private BulkAuditLogger auditLogger = new BulkAuditLogger();
    private S3Provider wrapper = null;
    private AmazonS3 cosClient = null;
    private BulkDataContext ctx = null;
    private boolean isExportPublic = true;
    long resourcesPerObject = ConfigurationFactory.getInstance().getCoreCosObjectResourceCountThreshold();
    private Set<Long> resourcesForLastTimestamp = new HashSet();
    private int partUploadTriggerSize = ConfigurationFactory.getInstance().getCoreCosPartUploadTriggerSize() * NDJSON_LINE_SEPARATOR;
    private long maxObjectSize = ConfigurationFactory.getInstance().getCoreCosObjectSizeThreshold();
    private final int initialBufferSize = this.partUploadTriggerSize + 131072;
    private long txTimeoutMillis = ConfigurationFactory.getInstance().getCoreFastMaxReadTimeout();
    private int currentUploadNumber = 1;
    private List<PartETag> uploadedParts = new ArrayList();
    private List<Integer> resourceCounts = new ArrayList();
    private InputOutputByteStream ioBuffer = new InputOutputByteStream(this.initialBufferSize);

    public ResourcePayloadReader() {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Max resources Per Object: " + this.resourcesPerObject);
            logger.fine("Part Upload Trigger Size: " + this.partUploadTriggerSize);
            logger.fine("Max Object Size (threshold): " + this.maxObjectSize);
        }
    }

    private String logPrefix() {
        return this.jobContext.getJobName() + "[" + this.jobContext.getExecutionId() + "]";
    }

    public void open(Serializable serializable) throws Exception {
        this.ctx = new BatchContextAdapter(BatchRuntime.getJobOperator().getJobExecution(this.jobContext.getExecutionId()).getJobParameters()).getStepContextForFastResourceWriter();
        this.ctx.setPartitionResourceType(this.resourceTypeStr);
        ConfigurationAdapter configurationFactory = ConfigurationFactory.getInstance();
        configurationFactory.registerRequestContext(this.ctx.getTenantId(), this.ctx.getDatastoreId(), this.ctx.getIncomingUrl());
        this.fhirResourceType = this.ctx.getPartitionResourceType();
        String source = this.ctx.getSource();
        this.cosBucketName = configurationFactory.getStorageProviderBucketName(source);
        this.cosBucketPathPrefix = this.ctx.getCosBucketPathPrefix();
        String fhirSearchFromDate = this.ctx.getFhirSearchFromDate();
        if (fhirSearchFromDate != null) {
            this.fromLastModified = DateTimeHandler.generateValue(DateTimeHandler.parse(fhirSearchFromDate));
            logger.fine(logPrefix() + " fromLastModified = " + fhirSearchFromDate + "(" + this.fromLastModified + ")");
        }
        String fhirSearchToDate = this.ctx.getFhirSearchToDate();
        if (fhirSearchToDate != null) {
            this.toLastModified = DateTimeHandler.generateValue(DateTimeHandler.parse(fhirSearchToDate));
            logger.fine(logPrefix() + " toLastModified = " + fhirSearchToDate + "(" + this.toLastModified + ")");
        }
        this.resourcesForLastTimestamp.clear();
        this.lastTimestamp = this.fromLastModified;
        if (serializable != null) {
            loadStateFrom((CheckpointUserData) serializable);
            this.ioBuffer.reset();
        }
        this.stepCtx.setTransientUserData(new TransientUserData());
        this.fhirPersistence = new FHIRPersistenceHelper().getFHIRPersistenceImplementation();
        this.resourceType = ModelSupport.getResourceType(this.fhirResourceType);
        this.isExportPublic = configurationFactory.isStorageProviderExportPublic(source);
        this.wrapper = new S3Provider(source);
        this.wrapper.createSource();
        this.cosClient = this.wrapper.getClient();
    }

    public Object readItem() throws Exception {
        logger.entering(CLASS, "readItem");
        long nanoTime = System.nanoTime();
        this.resourcesProcessed = 0;
        this.txEndTime = nanoTime + (this.txTimeoutMillis * 1000000);
        FHIRTransactionHelper fHIRTransactionHelper = new FHIRTransactionHelper(this.fhirPersistence.getTransaction());
        fHIRTransactionHelper.begin();
        boolean z = true;
        while (!isTxTimeExpired() && z) {
            try {
                if (logger.isLoggable(Level.FINE)) {
                    logger.fine("Fetching " + this.resourceType.getSimpleName() + " from: " + this.fromLastModified + " to " + this.toLastModified);
                }
                int i = this.resourcesProcessed;
                ResourcePayload fetchResourcePayloads = this.fhirPersistence.fetchResourcePayloads(this.resourceType, this.fromLastModified, this.toLastModified, resourcePayload -> {
                    return processPayload(resourcePayload);
                });
                int i2 = this.resourcesProcessed - i;
                if (this.processingException != null) {
                    logger.log(Level.SEVERE, logPrefix(), (Throwable) this.processingException);
                    throw this.processingException;
                }
                if (fetchResourcePayloads != null && i2 > 0) {
                    this.fromLastModified = fetchResourcePayloads.getLastUpdated();
                } else if (!isTxTimeExpired()) {
                    logger.fine(logPrefix() + " no more data");
                    z = false;
                }
            } catch (Throwable th) {
                fHIRTransactionHelper.end();
                double nanoTime2 = (System.nanoTime() - nanoTime) / 1.0E9d;
                logger.info(String.format("%s processed %d resources in %.2f seconds (rate=%.1f resources/second)", logPrefix(), Integer.valueOf(this.resourcesProcessed), Double.valueOf(nanoTime2), Double.valueOf(this.resourcesProcessed / nanoTime2)));
                if (this.auditLogger.shouldLog() && this.resourcesProcessed >= 0) {
                    Date date = new Date(System.currentTimeMillis());
                    this.auditLogger.logFastOnExport(this.resourceTypeStr, "_lastUpdated=" + this.lastTimestamp + "&_type=" + this.fhirResourceType, this.resourcesProcessed, date, date, Response.Status.OK, "StorageProvider@" + this.ctx.getSource(), "BulkDataOperator");
                }
                throw th;
            }
        }
        if (this.uploadId != null && !z) {
            completeCurrentUpload();
        }
        fHIRTransactionHelper.end();
        double nanoTime3 = (System.nanoTime() - nanoTime) / 1.0E9d;
        logger.info(String.format("%s processed %d resources in %.2f seconds (rate=%.1f resources/second)", logPrefix(), Integer.valueOf(this.resourcesProcessed), Double.valueOf(nanoTime3), Double.valueOf(this.resourcesProcessed / nanoTime3)));
        if (this.auditLogger.shouldLog() && this.resourcesProcessed >= 0) {
            Date date2 = new Date(System.currentTimeMillis());
            this.auditLogger.logFastOnExport(this.resourceTypeStr, "_lastUpdated=" + this.lastTimestamp + "&_type=" + this.fhirResourceType, this.resourcesProcessed, date2, date2, Response.Status.OK, "StorageProvider@" + this.ctx.getSource(), "BulkDataOperator");
        }
        if (!z) {
            TransientUserData transientUserData = (TransientUserData) this.stepCtx.getTransientUserData();
            transientUserData.setCompleted(true);
            transientUserData.setResourceType(this.fhirResourceType);
            transientUserData.setResourceCounts(this.resourceCounts);
        }
        logger.exiting(CLASS, "readItem");
        if (z) {
            return new Object();
        }
        return null;
    }

    public Boolean processPayload(ResourcePayload resourcePayload) {
        try {
            if (this.lastTimestamp == null || !this.lastTimestamp.equals(resourcePayload.getLastUpdated())) {
                this.resourcesForLastTimestamp.clear();
                this.lastTimestamp = resourcePayload.getLastUpdated();
            }
            if (!this.resourcesForLastTimestamp.contains(Long.valueOf(resourcePayload.getResourceId()))) {
                this.resourcesForLastTimestamp.add(Long.valueOf(resourcePayload.getResourceId()));
                this.resourcesProcessed++;
                if (logger.isLoggable(Level.FINER)) {
                    logger.finer(logPrefix() + " Processing payload for '" + this.resourceType.getSimpleName() + "/" + resourcePayload.getLogicalId() + "'");
                }
                if (this.uploadId != null && (this.currentObjectSize > this.maxObjectSize || this.currentObjectResourceCount >= this.resourcesPerObject)) {
                    if (logger.isLoggable(Level.FINE)) {
                        logger.fine(logPrefix() + " Completing current upload '" + this.uploadId + "', resources = " + this.currentObjectResourceCount + ", currentObjectSize = " + this.currentObjectSize + " bytes");
                    }
                    completeCurrentUpload();
                }
                OutputStream outputStream = this.ioBuffer.outputStream();
                if (this.ioBuffer.size() > 0) {
                    outputStream.write(NDJSON_LINE_SEPARATOR);
                }
                this.currentObjectSize += resourcePayload.transferTo(outputStream);
                this.currentObjectResourceCount++;
                uploadWhenReady();
            }
            boolean isTxTimeExpired = isTxTimeExpired();
            if (isTxTimeExpired && logger.isLoggable(Level.FINE)) {
                logger.fine(logPrefix() + " Stopping to allow tx commit before timeout");
            }
            return Boolean.valueOf(!isTxTimeExpired);
        } catch (Exception e) {
            this.processingException = e;
            return Boolean.FALSE;
        }
    }

    protected boolean isTxTimeExpired() {
        return System.nanoTime() > this.txEndTime;
    }

    private void uploadWhenReady() throws Exception {
        if (this.uploadId == null) {
            if (this.cosBucketPathPrefix == null || this.cosBucketPathPrefix.trim().length() <= 0) {
                this.currentObjectName = "job" + this.jobContext.getExecutionId() + "/" + this.fhirResourceType + "_" + this.currentUploadNumber + ".ndjson";
            } else {
                this.currentObjectName = this.cosBucketPathPrefix + "/" + this.fhirResourceType + "_" + this.currentUploadNumber + ".ndjson";
            }
            this.uploadId = BulkDataUtils.startPartUpload(this.cosClient, this.cosBucketName, this.currentObjectName, this.isExportPublic);
            if (logger.isLoggable(Level.FINE)) {
                logger.fine(logPrefix() + " Started new multi-part upload: '" + this.uploadId + "'");
            }
        }
        if (this.ioBuffer.size() > this.partUploadTriggerSize) {
            uploadPart();
        }
    }

    private void uploadPart() throws Exception {
        int size = this.uploadedParts.size() + 1;
        if (logger.isLoggable(Level.FINE)) {
            logger.fine(logPrefix() + " Uploading part# " + size + " [" + this.ioBuffer.size() + " bytes] for uploadId '" + this.uploadId + "'");
        }
        this.uploadedParts.add(BulkDataUtils.multiPartUpload(this.cosClient, this.cosBucketName, this.currentObjectName, this.uploadId, this.ioBuffer.inputStream(), this.ioBuffer.size(), size));
        this.ioBuffer.reset();
    }

    private void completeCurrentUpload() throws Exception {
        if (this.uploadId == null) {
            throw new IllegalStateException("Upload is not active");
        }
        if (this.ioBuffer.size() > 0) {
            logger.fine(logPrefix() + " uploading final part for '" + this.uploadId + "'");
            uploadPart();
        }
        try {
            logger.fine(logPrefix() + " finishing multi-part upload '" + this.uploadId + "'");
            BulkDataUtils.finishMultiPartUpload(this.cosClient, this.cosBucketName, this.currentObjectName, this.uploadId, this.uploadedParts);
            this.resourceCounts.add(Integer.valueOf(this.currentObjectResourceCount));
        } finally {
            resetUploadState();
        }
    }

    private void resetUploadState() {
        logger.fine(logPrefix() + " resetting state so we are ready to upload the next object");
        this.uploadedParts.clear();
        this.uploadId = null;
        this.currentObjectName = null;
        this.currentObjectSize = 0L;
        this.currentObjectResourceCount = 0;
        this.currentUploadNumber++;
    }

    private void loadStateFrom(CheckpointUserData checkpointUserData) {
        this.uploadedParts.clear();
        this.resourcesForLastTimestamp.clear();
        this.resourceCounts.clear();
        this.fromLastModified = checkpointUserData.getFromLastModified();
        this.uploadId = checkpointUserData.getUploadId();
        this.uploadedParts.addAll(checkpointUserData.getUploadedParts());
        this.currentObjectName = checkpointUserData.getCurrentObjectName();
        this.currentObjectSize = checkpointUserData.getCurrentObjectSize();
        this.currentObjectResourceCount = checkpointUserData.getCurrentObjectResourceCount();
        this.currentUploadNumber = checkpointUserData.getCurrentUploadNumber();
        this.resourcesForLastTimestamp.addAll(checkpointUserData.getResourcesForLastTimestamp());
        this.resourceCounts.addAll(checkpointUserData.getResourceCounts());
        this.lastTimestamp = this.fromLastModified;
    }

    public Serializable checkpointInfo() throws Exception {
        logger.entering(CLASS, "checkpointInfo");
        CheckpointUserData checkpointUserData = new CheckpointUserData();
        checkpointUserData.setResourceType(this.fhirResourceType);
        checkpointUserData.setFromLastModified(this.fromLastModified);
        checkpointUserData.setUploadId(this.uploadId);
        checkpointUserData.setUploadedParts(this.uploadedParts);
        checkpointUserData.setCurrentObjectName(this.currentObjectName);
        checkpointUserData.setCurrentObjectSize(this.currentObjectSize);
        checkpointUserData.setCurrentObjectResourceCount(this.currentObjectResourceCount);
        checkpointUserData.setCurrentUploadNumber(this.currentUploadNumber);
        checkpointUserData.setResourcesForLastTimestamp(this.resourcesForLastTimestamp);
        checkpointUserData.setResourceCounts(this.resourceCounts);
        logger.exiting(CLASS, "checkpointInfo");
        return checkpointUserData;
    }

    public void close() throws Exception {
    }
}
