package org.apache.samza.util;

import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.table.CloudTable;
import com.microsoft.azure.storage.table.TableOperation;
import com.microsoft.azure.storage.table.TableQuery;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.AzureClient;
import org.apache.samza.AzureException;
import org.apache.samza.coordinator.data.ProcessorEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/util/TableUtils.class */
public class TableUtils {
    private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class);
    private static final String PARTITION_KEY = "PartitionKey";
    private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
    private final String initialState;
    private final CloudTable table;

    public TableUtils(AzureClient azureClient, String str, String str2) {
        this.initialState = str2;
        try {
            this.table = azureClient.getTableClient().getTableReference(str);
            this.table.createIfNotExists();
        } catch (URISyntaxException e) {
            LOG.error("\nConnection string specifies an invalid URI.", e);
            throw new AzureException(e);
        } catch (StorageException e2) {
            LOG.error("Azure storage exception.", e2);
            throw new AzureException((Throwable) e2);
        }
    }

    public void addProcessorEntity(String str, String str2, boolean z) {
        ProcessorEntity processorEntity = new ProcessorEntity(str, str2);
        processorEntity.setIsLeader(z);
        processorEntity.updateLiveness();
        try {
            this.table.execute(TableOperation.insert(processorEntity));
        } catch (StorageException e) {
            LOG.error("Azure storage exception while adding processor entity with job model version: " + str + "and pid: " + str2, e);
            throw new AzureException((Throwable) e);
        }
    }

    public ProcessorEntity getEntity(String str, String str2) {
        try {
            return (ProcessorEntity) this.table.execute(TableOperation.retrieve(str, str2, ProcessorEntity.class)).getResultAsType();
        } catch (StorageException e) {
            LOG.error("Azure storage exception while retrieving processor entity with job model version: " + str + "and pid: " + str2, e);
            throw new AzureException((Throwable) e);
        }
    }

    public void updateHeartbeat(String str, String str2) {
        try {
            ProcessorEntity processorEntity = (ProcessorEntity) this.table.execute(TableOperation.retrieve(str, str2, ProcessorEntity.class)).getResultAsType();
            processorEntity.updateLiveness();
            this.table.execute(TableOperation.replace(processorEntity));
        } catch (StorageException e) {
            LOG.error("Azure storage exception while updating heartbeat for job model version: " + str + "and pid: " + str2, e);
        }
    }

    public void updateIsLeader(String str, String str2, boolean z) {
        try {
            ProcessorEntity processorEntity = (ProcessorEntity) this.table.execute(TableOperation.retrieve(str, str2, ProcessorEntity.class)).getResultAsType();
            processorEntity.setIsLeader(z);
            this.table.execute(TableOperation.replace(processorEntity));
        } catch (StorageException e) {
            LOG.error("Azure storage exception while updating isLeader value for job model version: " + str + "and pid: " + str2, e);
            throw new AzureException((Throwable) e);
        }
    }

    public void deleteProcessorEntity(String str, String str2, boolean z) {
        try {
            ProcessorEntity processorEntity = (ProcessorEntity) this.table.execute(TableOperation.retrieve(str, str2, ProcessorEntity.class)).getResultAsType();
            if (z) {
                processorEntity.setEtag("*");
            }
            this.table.execute(TableOperation.delete(processorEntity));
        } catch (StorageException e) {
            LOG.error("Azure storage exception while deleting processor entity with job model version: " + str + "and pid: " + str2, e);
            throw new AzureException((Throwable) e);
        }
    }

    public void deleteProcessorEntity(ProcessorEntity processorEntity) {
        try {
            this.table.execute(TableOperation.delete(processorEntity));
        } catch (StorageException e) {
            LOG.error("Azure storage exception while deleting processor entity with job model version: " + processorEntity.getJobModelVersion() + "and pid: " + processorEntity.getProcessorId(), e);
            throw new AzureException((Throwable) e);
        }
    }

    public Iterable<ProcessorEntity> getEntitiesWithPartition(String str) {
        return this.table.execute(TableQuery.from(ProcessorEntity.class).where(TableQuery.generateFilterCondition(PARTITION_KEY, "eq", str)));
    }

    public Set<String> getActiveProcessorsList(AtomicReference<String> atomicReference) {
        Iterable<ProcessorEntity> entitiesWithPartition = getEntitiesWithPartition(atomicReference.get());
        HashSet hashSet = new HashSet();
        for (ProcessorEntity processorEntity : entitiesWithPartition) {
            if (System.currentTimeMillis() - processorEntity.getTimestamp().getTime() <= 30000) {
                hashSet.add(processorEntity.getRowKey());
            }
        }
        for (ProcessorEntity processorEntity2 : getEntitiesWithPartition(this.initialState)) {
            long currentTimeMillis = System.currentTimeMillis() - processorEntity2.getTimestamp().getTime();
            LOG.info("Time elapsed since last heartbeat: {}", Long.valueOf(currentTimeMillis));
            if (currentTimeMillis <= 30000) {
                hashSet.add(processorEntity2.getRowKey());
            }
        }
        LOG.info("Active processors list: {}", hashSet);
        return hashSet;
    }

    public CloudTable getTable() {
        return this.table;
    }
}
