package io.mantisrx.server.master.persistence;

import com.netflix.spectator.api.Tag;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.master.events.LifecycleEventPublisher;
import io.mantisrx.master.jobcluster.IJobClusterMetadata;
import io.mantisrx.master.jobcluster.job.IMantisJobMetadata;
import io.mantisrx.master.jobcluster.job.IMantisStageMetadata;
import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata;
import io.mantisrx.master.resourcecluster.DisableTaskExecutorsRequest;
import io.mantisrx.server.core.domain.JobArtifact;
import io.mantisrx.server.master.domain.DataFormatAdapter;
import io.mantisrx.server.master.domain.JobClusterDefinitionImpl;
import io.mantisrx.server.master.resourcecluster.ClusterID;
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.master.store.KeyValueStore;
import io.mantisrx.server.master.store.MantisJobMetadataWritable;
import io.mantisrx.server.master.store.MantisStageMetadata;
import io.mantisrx.server.master.store.MantisStageMetadataWritable;
import io.mantisrx.server.master.store.MantisWorkerMetadata;
import io.mantisrx.server.master.store.MantisWorkerMetadataWritable;
import io.mantisrx.server.master.store.NamedJob;
import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import io.mantisrx.shaded.com.fasterxml.jackson.core.type.TypeReference;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.DeserializationFeature;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.mantisrx.shaded.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.collect.ImmutableList;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/* loaded from: input_file:io/mantisrx/server/master/persistence/KeyValueBasedPersistenceProvider.class */
public class KeyValueBasedPersistenceProvider implements IMantisPersistenceProvider {
    private static final String JOB_STAGEDATA_NS = "MantisJobStageData";
    private static final String ARCHIVED_JOB_STAGEDATA_NS = "MantisArchivedJobStageData";
    private static final String WORKERS_NS = "MantisWorkers";
    private static final String ARCHIVED_WORKERS_NS = "MantisArchivedWorkers";
    private static final String NAMED_JOBS_NS = "MantisNamedJobs";
    private static final String NAMED_COMPLETEDJOBS_NS = "MantisNamedJobCompletedJobs";
    private static final String ACTIVE_ASGS_NS = "MantisActiveASGs";
    private static final String TASK_EXECUTOR_REGISTRATION = "TaskExecutorRegistration";
    private static final String DISABLE_TASK_EXECUTOR_REQUESTS = "MantisDisableTaskExecutorRequests";
    private static final String CONTROLPLANE_NS = "mantis_controlplane";
    private static final String JOB_ARTIFACTS_NS = "mantis_global_job_artifacts";
    private static final String JOB_METADATA_SECONDARY_KEY = "jobMetadata";
    private static final String JOB_STAGE_METADATA_SECONDARY_KEY_PREFIX = "stageMetadata";
    private static final String NAMED_JOB_SECONDARY_KEY = "jobNameInfo";
    private static final String JOB_ARTIFACTS_BY_NAME_PARTITION_KEY = "JobArtifactsByName";
    private static final int WORKER_BATCH_SIZE = 1000;
    private static final int WORKER_MAX_INDEX = 30000;
    private final KeyValueStore kvStore;
    private final LifecycleEventPublisher eventPublisher;
    private final Counter noWorkersFoundCounter;
    private final Counter workersFoundCounter;
    private static final Logger logger = LoggerFactory.getLogger(KeyValueBasedPersistenceProvider.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final long TTL_IN_MS = TimeUnit.DAYS.toMillis(7);

    public KeyValueBasedPersistenceProvider(KeyValueStore keyValueStore, LifecycleEventPublisher lifecycleEventPublisher) {
        this.kvStore = keyValueStore;
        this.eventPublisher = lifecycleEventPublisher;
        Metrics registerAndGet = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().id("storage", new Tag[0]).addCounter("noWorkersFound").addCounter("workersFound").build());
        this.noWorkersFoundCounter = registerAndGet.getCounter("noWorkersFound");
        this.workersFoundCounter = registerAndGet.getCounter("workersFound");
    }

    protected String getJobMetadataFieldName() {
        return "jobMetadata";
    }

    protected String getStageMetadataFieldPrefix() {
        return JOB_STAGE_METADATA_SECONDARY_KEY_PREFIX;
    }

    protected String getJobArtifactsByNamePartitionKey() {
        return JOB_ARTIFACTS_BY_NAME_PARTITION_KEY;
    }

    protected Duration getArchiveDataTtlInMs() {
        return Duration.ofMillis(TTL_IN_MS);
    }

    protected String getJobStageFieldName(int i) {
        return String.format("%s-%d", getStageMetadataFieldPrefix(), Integer.valueOf(i));
    }

    protected String getJobClusterFieldName() {
        return NAMED_JOB_SECONDARY_KEY;
    }

    private boolean jobIsValid(MantisJobMetadataWritable mantisJobMetadataWritable) {
        int numStages = mantisJobMetadataWritable.getNumStages();
        Collection<? extends MantisStageMetadata> stageMetadata = mantisJobMetadataWritable.getStageMetadata();
        if (stageMetadata == null) {
            logger.error("Could not find stage metadata for jobId {}", mantisJobMetadataWritable.getJobId());
            return false;
        }
        if (stageMetadata.size() == numStages) {
            return true;
        }
        logger.error("Invalid stage metadata for job {}: stage count mismatch expected {} vs found {}", new Object[]{mantisJobMetadataWritable.getJobId(), Integer.valueOf(numStages), Integer.valueOf(stageMetadata.size())});
        return false;
    }

    private MantisJobMetadataWritable readJobStageData(String str, String str2) throws IOException {
        return readJobStageData(str2, this.kvStore.getAll(str, str2));
    }

    private MantisJobMetadataWritable readJobStageData(String str, Map<String, String> map) throws IOException {
        String jobMetadataFieldName = getJobMetadataFieldName();
        AtomicReference atomicReference = new AtomicReference();
        LinkedList linkedList = new LinkedList();
        map.forEach((str2, str3) -> {
            if (str2 != null && str3 != null) {
                try {
                    if (jobMetadataFieldName.equals(str2)) {
                        atomicReference.set(mapper.readValue(str3, MantisJobMetadataWritable.class));
                    } else if (str2.startsWith(getStageMetadataFieldPrefix())) {
                        linkedList.add(mapper.readValue(str3, MantisStageMetadataWritable.class));
                    }
                } catch (JsonProcessingException e) {
                    logger.warn("failed to deserialize job metadata for jobId {}, column name {}", new Object[]{str, str2, e});
                }
            }
        });
        MantisJobMetadataWritable mantisJobMetadataWritable = (MantisJobMetadataWritable) atomicReference.get();
        if (mantisJobMetadataWritable == null) {
            throw new IOException("No " + jobMetadataFieldName + " column found for key jobId=" + str);
        }
        if (linkedList.isEmpty()) {
            throw new IOException("No stage metadata columns with prefix " + getStageMetadataFieldPrefix() + " found for jobId=" + str);
        }
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            mantisJobMetadataWritable.addJobStageIfAbsent((MantisStageMetadataWritable) it.next());
        }
        if (jobIsValid(mantisJobMetadataWritable)) {
            return mantisJobMetadataWritable;
        }
        throw new IOException(String.format("Invalid job for jobId %s", str));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void storeNewJob(IMantisJobMetadata iMantisJobMetadata) throws Exception {
        try {
            this.kvStore.upsert(JOB_STAGEDATA_NS, iMantisJobMetadata.getJobId().toString(), getJobMetadataFieldName(), mapper.writeValueAsString(DataFormatAdapter.convertMantisJobMetadataToMantisJobMetadataWriteable(iMantisJobMetadata)));
        } catch (IOException e) {
            throw new Exception(e);
        }
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void updateJob(IMantisJobMetadata iMantisJobMetadata) throws Exception {
        this.kvStore.upsert(JOB_STAGEDATA_NS, iMantisJobMetadata.getJobId().toString(), getJobMetadataFieldName(), mapper.writeValueAsString(DataFormatAdapter.convertMantisJobMetadataToMantisJobMetadataWriteable(iMantisJobMetadata)));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void archiveJob(String str) throws IOException {
        Map<String, String> all = this.kvStore.getAll(JOB_STAGEDATA_NS, str);
        int workerMaxPartitionKey = workerMaxPartitionKey(readJobStageData(str, all));
        this.kvStore.upsertAll(ARCHIVED_JOB_STAGEDATA_NS, str, all, getArchiveDataTtlInMs());
        this.kvStore.deleteAll(JOB_STAGEDATA_NS, str);
        for (int i = 0; i < workerMaxPartitionKey; i += WORKER_BATCH_SIZE) {
            String makeBucketizedPartitionKey = makeBucketizedPartitionKey(str, i);
            this.kvStore.upsertAll(ARCHIVED_WORKERS_NS, makeBucketizedPartitionKey, this.kvStore.getAll(WORKERS_NS, makeBucketizedPartitionKey), getArchiveDataTtlInMs());
            this.kvStore.deleteAll(WORKERS_NS, makeBucketizedPartitionKey);
        }
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void deleteJob(String str) throws Exception {
        int workerMaxPartitionKey = workerMaxPartitionKey(readJobStageData(JOB_STAGEDATA_NS, str));
        this.kvStore.deleteAll(JOB_STAGEDATA_NS, str);
        rangeOperation(workerMaxPartitionKey, num -> {
            try {
                this.kvStore.deleteAll(WORKERS_NS, makeBucketizedPartitionKey(str, num.intValue()));
            } catch (IOException e) {
                logger.warn("failed to delete worker for jobId {} with index {}", new Object[]{str, num, e});
            }
        });
        this.kvStore.deleteAll(ARCHIVED_JOB_STAGEDATA_NS, str);
        rangeOperation(workerMaxPartitionKey, num2 -> {
            try {
                this.kvStore.deleteAll(ARCHIVED_WORKERS_NS, makeBucketizedPartitionKey(str, num2.intValue()));
            } catch (IOException e) {
                logger.warn("failed to delete worker for jobId {} with index {}", new Object[]{str, num2, e});
            }
        });
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void storeMantisStage(IMantisStageMetadata iMantisStageMetadata) throws IOException {
        this.kvStore.upsert(JOB_STAGEDATA_NS, iMantisStageMetadata.getJobId().toString(), getJobStageFieldName(iMantisStageMetadata.getStageNum()), mapper.writeValueAsString(DataFormatAdapter.convertMantisStageMetadataToMantisStageMetadataWriteable(iMantisStageMetadata)));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void updateMantisStage(IMantisStageMetadata iMantisStageMetadata) throws IOException {
        storeMantisStage(iMantisStageMetadata);
    }

    private int workerMaxPartitionKey(MantisJobMetadataWritable mantisJobMetadataWritable) {
        try {
            return mantisJobMetadataWritable.getNextWorkerNumberToUse();
        } catch (Exception e) {
            return WORKER_MAX_INDEX;
        }
    }

    private int bucketizePartitionKey(int i) {
        return (int) (1000.0d * Math.ceil((1.0d * Math.max(1, i)) / 1000.0d));
    }

    private String makeBucketizedPartitionKey(String str, int i) {
        return String.format("%s-%d", str, Integer.valueOf(bucketizePartitionKey(i)));
    }

    private String makeBucketizedSecondaryKey(int i, int i2, int i3) {
        return String.format("%d-%d-%d", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3));
    }

    private void rangeOperation(int i, Consumer<Integer> consumer) {
        int bucketizePartitionKey = bucketizePartitionKey(i);
        for (int i2 = WORKER_BATCH_SIZE; i2 <= bucketizePartitionKey; i2 += WORKER_BATCH_SIZE) {
            consumer.accept(Integer.valueOf(i2));
        }
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void storeWorker(IMantisWorkerMetadata iMantisWorkerMetadata) throws IOException {
        storeWorkers(iMantisWorkerMetadata.getJobId(), Collections.singletonList(iMantisWorkerMetadata));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void storeWorkers(List<IMantisWorkerMetadata> list) throws IOException {
        Iterator<IMantisWorkerMetadata> it = list.iterator();
        while (it.hasNext()) {
            MantisWorkerMetadataWritable convertMantisWorkerMetadataToMantisWorkerMetadataWritable = DataFormatAdapter.convertMantisWorkerMetadataToMantisWorkerMetadataWritable(it.next());
            this.kvStore.upsert(WORKERS_NS, makeBucketizedPartitionKey(convertMantisWorkerMetadataToMantisWorkerMetadataWritable.getJobId(), convertMantisWorkerMetadataToMantisWorkerMetadataWritable.getWorkerNumber()), makeBucketizedSecondaryKey(convertMantisWorkerMetadataToMantisWorkerMetadataWritable.getStageNum(), convertMantisWorkerMetadataToMantisWorkerMetadataWritable.getWorkerIndex(), convertMantisWorkerMetadataToMantisWorkerMetadataWritable.getWorkerNumber()), mapper.writeValueAsString(convertMantisWorkerMetadataToMantisWorkerMetadataWritable));
        }
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void storeAndUpdateWorkers(IMantisWorkerMetadata iMantisWorkerMetadata, IMantisWorkerMetadata iMantisWorkerMetadata2) throws IOException {
        storeWorkers(ImmutableList.of(iMantisWorkerMetadata, iMantisWorkerMetadata2));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void updateWorker(IMantisWorkerMetadata iMantisWorkerMetadata) throws IOException {
        storeWorker(iMantisWorkerMetadata);
    }

    private Map<String, List<MantisWorkerMetadataWritable>> getAllWorkersByJobId(String str) throws IOException {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Map<String, String>> entry : this.kvStore.getAllRows(str).entrySet()) {
            if (entry.getValue().values().size() > 0) {
                List list = (List) entry.getValue().values().stream().map(str2 -> {
                    try {
                        return (MantisWorkerMetadataWritable) mapper.readValue(str2, MantisWorkerMetadataWritable.class);
                    } catch (JsonProcessingException e) {
                        logger.warn("failed to parse worker against pkey {} json {}", new Object[]{entry.getKey(), str2, e});
                        return null;
                    }
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).collect(Collectors.toList());
                ((List) hashMap.computeIfAbsent(((MantisWorkerMetadataWritable) list.get(0)).getJobId(), str3 -> {
                    return Lists.newArrayList();
                })).addAll(list);
            }
        }
        return hashMap;
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public List<IMantisJobMetadata> loadAllJobs() throws IOException {
        logger.info("MantisStorageProviderAdapter:Enter loadAllJobs");
        Map<String, List<MantisWorkerMetadataWritable>> allWorkersByJobId = getAllWorkersByJobId(WORKERS_NS);
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<String, Map<String, String>> entry : this.kvStore.getAllRows(JOB_STAGEDATA_NS).entrySet()) {
            String key = entry.getKey();
            try {
                MantisJobMetadataWritable readJobStageData = readJobStageData(key, entry.getValue());
                if (CollectionUtils.isEmpty(allWorkersByJobId.get(key))) {
                    logger.warn("No workers found for job {}, skipping", key);
                    this.noWorkersFoundCounter.increment();
                } else {
                    this.workersFoundCounter.increment();
                    for (MantisWorkerMetadataWritable mantisWorkerMetadataWritable : allWorkersByJobId.get(key)) {
                        Preconditions.checkState(readJobStageData.addWorkerMedata(mantisWorkerMetadataWritable.getStageNum(), mantisWorkerMetadataWritable, null), "JobID=%s stage=%d workerIdx=%d has existing worker, existing=%s, new=%s", new Object[]{mantisWorkerMetadataWritable.getJobId(), Integer.valueOf(mantisWorkerMetadataWritable.getStageNum()), Integer.valueOf(mantisWorkerMetadataWritable.getWorkerIndex()), readJobStageData.getWorkerByIndex(mantisWorkerMetadataWritable.getStageNum(), mantisWorkerMetadataWritable.getWorkerIndex()).getWorkerId(), mantisWorkerMetadataWritable.getWorkerId()});
                    }
                    newArrayList.add(DataFormatAdapter.convertMantisJobWriteableToMantisJobMetadata(readJobStageData, this.eventPublisher));
                }
            } catch (Exception e) {
                logger.warn("Exception loading job {}", key, e);
            }
        }
        logger.info("MantisStorageProviderAdapter:Exit loadAllJobs {}", Integer.valueOf(newArrayList.size()));
        return newArrayList;
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public Observable<IMantisJobMetadata> loadAllArchivedJobs() {
        return Observable.create(subscriber -> {
            try {
                Iterator<String> it = this.kvStore.getAllPartitionKeys(ARCHIVED_JOB_STAGEDATA_NS).iterator();
                while (it.hasNext()) {
                    Optional<IMantisJobMetadata> loadArchivedJob = loadArchivedJob(it.next());
                    subscriber.getClass();
                    loadArchivedJob.ifPresent((v1) -> {
                        r1.onNext(v1);
                    });
                }
                subscriber.onCompleted();
            } catch (IOException e) {
                subscriber.onError(e);
            }
        });
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public List<IJobClusterMetadata> loadAllJobClusters() throws IOException {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<String, Map<String, String>> entry : this.kvStore.getAllRows(NAMED_JOBS_NS).entrySet()) {
            String key = entry.getKey();
            try {
                newArrayList.add(DataFormatAdapter.convertNamedJobToJobClusterMetadata(getJobCluster(NAMED_JOBS_NS, key, entry.getValue().get(getJobClusterFieldName()))));
                atomicInteger2.getAndIncrement();
            } catch (Exception e) {
                logger.error("Exception {} getting job cluster for {} ", new Object[]{e.getMessage(), key, e});
                atomicInteger.getAndIncrement();
            }
        }
        return newArrayList;
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public List<JobClusterDefinitionImpl.CompletedJob> loadAllCompletedJobs() throws IOException {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        List<JobClusterDefinitionImpl.CompletedJob> list = (List) this.kvStore.getAllRows(NAMED_COMPLETEDJOBS_NS).values().stream().flatMap(map -> {
            return map.values().stream();
        }).map(str -> {
            try {
                JobClusterDefinitionImpl.CompletedJob convertNamedJobCompletedJobToCompletedJob = DataFormatAdapter.convertNamedJobCompletedJobToCompletedJob((NamedJob.CompletedJob) mapper.readValue(str, NamedJob.CompletedJob.class));
                atomicInteger2.getAndIncrement();
                return convertNamedJobCompletedJobToCompletedJob;
            } catch (JsonProcessingException e) {
                logger.warn("failed to parse CompletedJob from {}", str, e);
                atomicInteger.getAndIncrement();
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        logger.info("Read and converted job clusters. Successful - {}, Failed - {}", Integer.valueOf(atomicInteger2.get()), Integer.valueOf(atomicInteger.get()));
        return list;
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void archiveWorker(IMantisWorkerMetadata iMantisWorkerMetadata) throws IOException {
        MantisWorkerMetadataWritable convertMantisWorkerMetadataToMantisWorkerMetadataWritable = DataFormatAdapter.convertMantisWorkerMetadataToMantisWorkerMetadataWritable(iMantisWorkerMetadata);
        String makeBucketizedPartitionKey = makeBucketizedPartitionKey(convertMantisWorkerMetadataToMantisWorkerMetadataWritable.getJobId(), convertMantisWorkerMetadataToMantisWorkerMetadataWritable.getWorkerNumber());
        String makeBucketizedSecondaryKey = makeBucketizedSecondaryKey(convertMantisWorkerMetadataToMantisWorkerMetadataWritable.getStageNum(), convertMantisWorkerMetadataToMantisWorkerMetadataWritable.getWorkerIndex(), convertMantisWorkerMetadataToMantisWorkerMetadataWritable.getWorkerNumber());
        this.kvStore.delete(WORKERS_NS, makeBucketizedPartitionKey, makeBucketizedSecondaryKey);
        this.kvStore.upsert(ARCHIVED_WORKERS_NS, makeBucketizedPartitionKey, makeBucketizedSecondaryKey, mapper.writeValueAsString(convertMantisWorkerMetadataToMantisWorkerMetadataWritable), getArchiveDataTtlInMs());
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public List<IMantisWorkerMetadata> getArchivedWorkers(String str) throws IOException {
        MantisJobMetadataWritable readJobStageData;
        try {
            readJobStageData = readJobStageData(JOB_STAGEDATA_NS, str);
        } catch (Exception e) {
            readJobStageData = readJobStageData(ARCHIVED_JOB_STAGEDATA_NS, str);
        }
        if (readJobStageData == null) {
            return Collections.emptyList();
        }
        int workerMaxPartitionKey = workerMaxPartitionKey(readJobStageData);
        ArrayList newArrayList = Lists.newArrayList();
        rangeOperation(workerMaxPartitionKey, num -> {
            String makeBucketizedPartitionKey = makeBucketizedPartitionKey(str, num.intValue());
            try {
                for (Map.Entry<String, String> entry : this.kvStore.getAll(ARCHIVED_WORKERS_NS, makeBucketizedPartitionKey).entrySet()) {
                    try {
                        newArrayList.add(DataFormatAdapter.convertMantisWorkerMetadataWriteableToMantisWorkerMetadata((MantisWorkerMetadata) mapper.readValue(entry.getValue(), MantisWorkerMetadataWritable.class), this.eventPublisher).getMetadata());
                    } catch (Exception e2) {
                        logger.warn("Exception converting worker for jobId {} ({}, {})", new Object[]{str, makeBucketizedPartitionKey, entry.getKey(), e2});
                    }
                }
            } catch (IOException e3) {
                logger.warn("Error reading archive workers for jobId {} for pkey {}", new Object[]{str, makeBucketizedPartitionKey, e3});
            }
        });
        return newArrayList;
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void createJobCluster(IJobClusterMetadata iJobClusterMetadata) throws Exception {
        updateJobCluster(iJobClusterMetadata);
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void updateJobCluster(IJobClusterMetadata iJobClusterMetadata) throws Exception {
        this.kvStore.upsert(NAMED_JOBS_NS, iJobClusterMetadata.getJobClusterDefinition().getName(), getJobClusterFieldName(), mapper.writeValueAsString(DataFormatAdapter.convertJobClusterMetadataToNamedJob(iJobClusterMetadata)));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void deleteJobCluster(String str) throws Exception {
        NamedJob jobCluster = getJobCluster(NAMED_JOBS_NS, str);
        this.kvStore.deleteAll(NAMED_JOBS_NS, str);
        rangeOperation((int) jobCluster.getNextJobNumber(), num -> {
            try {
                removeCompletedJobForCluster(str, makeBucketizedPartitionKey(str, num.intValue()));
            } catch (IOException e) {
                logger.warn("failed to completed job for named job {} with index {}", new Object[]{str, num, e});
            }
        });
    }

    private NamedJob getJobCluster(String str, String str2) throws Exception {
        return getJobCluster(str, str2, this.kvStore.get(str, str2, getJobClusterFieldName()));
    }

    private NamedJob getJobCluster(String str, String str2, String str3) throws Exception {
        return (NamedJob) mapper.readValue(str3, NamedJob.class);
    }

    private int parseJobId(String str) {
        return Integer.parseInt(str.substring(str.lastIndexOf("-") + 1));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void storeCompletedJobForCluster(String str, JobClusterDefinitionImpl.CompletedJob completedJob) throws IOException {
        int parseJobId = parseJobId(completedJob.getJobId());
        this.kvStore.upsert(NAMED_COMPLETEDJOBS_NS, makeBucketizedPartitionKey(str, parseJobId), String.valueOf(parseJobId), mapper.writeValueAsString(DataFormatAdapter.convertCompletedJobToNamedJobCompletedJob(completedJob)));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void removeCompletedJobForCluster(String str, String str2) throws IOException {
        this.kvStore.deleteAll(NAMED_COMPLETEDJOBS_NS, makeBucketizedPartitionKey(str, parseJobId(str2)));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public Optional<IMantisJobMetadata> loadArchivedJob(String str) throws IOException {
        try {
            return Optional.of(DataFormatAdapter.convertMantisJobWriteableToMantisJobMetadata(readJobStageData(ARCHIVED_JOB_STAGEDATA_NS, str), this.eventPublisher));
        } catch (Exception e) {
            logger.error("Exception loading archived job {}", str, e);
            return Optional.empty();
        }
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public List<String> initActiveVmAttributeValuesList() throws IOException {
        String str = this.kvStore.get(ACTIVE_ASGS_NS, "activeASGs", "thelist");
        logger.info("read active VMs data {} from Cass", str);
        return StringUtils.isBlank(str) ? Collections.emptyList() : (List) mapper.readValue(str, new TypeReference<List<String>>() { // from class: io.mantisrx.server.master.persistence.KeyValueBasedPersistenceProvider.1
        });
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void setActiveVmAttributeValuesList(List<String> list) throws IOException {
        logger.info("Setting active ASGs {}", list);
        this.kvStore.upsert(ACTIVE_ASGS_NS, "activeASGs", "thelist", mapper.writeValueAsString(list));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public TaskExecutorRegistration getTaskExecutorFor(TaskExecutorID taskExecutorID) throws IOException {
        try {
            return (TaskExecutorRegistration) mapper.readValue(this.kvStore.get(CONTROLPLANE_NS, "TaskExecutorRegistration-" + taskExecutorID.getResourceId(), taskExecutorID.getResourceId()), TaskExecutorRegistration.class);
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void storeNewTaskExecutor(TaskExecutorRegistration taskExecutorRegistration) throws IOException {
        String resourceId = taskExecutorRegistration.getTaskExecutorID().getResourceId();
        this.kvStore.upsert(CONTROLPLANE_NS, String.format("%s-%s", TASK_EXECUTOR_REGISTRATION, resourceId), resourceId, mapper.writeValueAsString(taskExecutorRegistration));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void storeNewDisableTaskExecutorRequest(DisableTaskExecutorsRequest disableTaskExecutorsRequest) throws IOException {
        this.kvStore.upsert(DISABLE_TASK_EXECUTOR_REQUESTS, disableTaskExecutorsRequest.getClusterID().getResourceID(), disableTaskExecutorsRequest.getHash(), mapper.writeValueAsString(disableTaskExecutorsRequest));
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void deleteExpiredDisableTaskExecutorRequest(DisableTaskExecutorsRequest disableTaskExecutorsRequest) throws IOException {
        this.kvStore.delete(DISABLE_TASK_EXECUTOR_REQUESTS, disableTaskExecutorsRequest.getClusterID().getResourceID(), disableTaskExecutorsRequest.getHash());
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public List<DisableTaskExecutorsRequest> loadAllDisableTaskExecutorsRequests(ClusterID clusterID) throws IOException {
        return (List) this.kvStore.getAll(DISABLE_TASK_EXECUTOR_REQUESTS, clusterID.getResourceID()).values().stream().map(str -> {
            try {
                return (DisableTaskExecutorsRequest) mapper.readValue(str, DisableTaskExecutorsRequest.class);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public boolean isArtifactExists(String str) throws IOException {
        return this.kvStore.isRowExists(JOB_ARTIFACTS_NS, str, str);
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public JobArtifact getArtifactById(String str) throws IOException {
        return (JobArtifact) mapper.readValue(this.kvStore.get(JOB_ARTIFACTS_NS, str, str), JobArtifact.class);
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public List<JobArtifact> listJobArtifacts(String str, String str2) throws IOException {
        return (List) (str2 == null ? this.kvStore.getAll(JOB_ARTIFACTS_NS, str).values() : ImmutableList.of(this.kvStore.get(JOB_ARTIFACTS_NS, str, str2))).stream().map(str3 -> {
            try {
                return (JobArtifact) mapper.readValue(str3, JobArtifact.class);
            } catch (JsonProcessingException e) {
                logger.warn("Failed to deserialize job artifact metadata for {} (data={})", new Object[]{str, str3, e});
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public List<String> listJobArtifactsByName(String str, String str2) throws IOException {
        Set<String> keySet = str.isEmpty() ? this.kvStore.getAll(JOB_ARTIFACTS_NS, getJobArtifactsByNamePartitionKey()).keySet() : this.kvStore.getAllWithPrefix(JOB_ARTIFACTS_NS, getJobArtifactsByNamePartitionKey(), str).keySet();
        return !str2.isEmpty() ? (List) keySet.stream().filter(str3 -> {
            return str3.toLowerCase().contains(str2.toLowerCase());
        }).distinct().collect(Collectors.toList()) : new ArrayList(keySet);
    }

    private void addNewJobArtifact(String str, String str2, JobArtifact jobArtifact) {
        try {
            this.kvStore.upsert(JOB_ARTIFACTS_NS, str, str2, mapper.writeValueAsString(jobArtifact));
        } catch (IOException e) {
            logger.error("Error while storing keyId {} for artifact {}", new Object[]{str, jobArtifact, e});
            throw new RuntimeException(e);
        }
    }

    @Override // io.mantisrx.server.master.persistence.IMantisPersistenceProvider
    public void addNewJobArtifact(JobArtifact jobArtifact) throws IOException {
        try {
            CompletableFuture.runAsync(() -> {
                addNewJobArtifact(getJobArtifactsByNamePartitionKey(), jobArtifact.getName(), jobArtifact);
            }).thenRunAsync(() -> {
                addNewJobArtifact(jobArtifact.getName(), jobArtifact.getVersion(), jobArtifact);
            }).thenRunAsync(() -> {
                addNewJobArtifact(jobArtifact.getArtifactID().getResourceID(), jobArtifact.getArtifactID().getResourceID(), jobArtifact);
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            logger.error("Error while storing job artifact {} to Cassandra Storage Provider.", jobArtifact, e);
            throw new IOException(e);
        }
    }

    static {
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).registerModule(new Jdk8Module()).registerModule(new JavaTimeModule());
    }
}
