/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.master.persistence;

import com.netflix.spectator.api.Tag;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.master.events.LifecycleEventPublisher;
import io.mantisrx.master.jobcluster.IJobClusterMetadata;
import io.mantisrx.master.jobcluster.job.IMantisJobMetadata;
import io.mantisrx.master.jobcluster.job.IMantisStageMetadata;
import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata;
import io.mantisrx.master.jobcluster.job.worker.JobWorker;
import io.mantisrx.master.resourcecluster.DisableTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleSpec;
import io.mantisrx.master.resourcecluster.writable.RegisteredResourceClustersWritable;
import io.mantisrx.master.resourcecluster.writable.ResourceClusterScaleRulesWritable;
import io.mantisrx.master.resourcecluster.writable.ResourceClusterSpecWritable;
import io.mantisrx.server.core.domain.ArtifactID;
import io.mantisrx.server.core.domain.JobArtifact;
import io.mantisrx.server.master.domain.DataFormatAdapter;
import io.mantisrx.server.master.domain.JobClusterDefinitionImpl;
import io.mantisrx.server.master.domain.JobId;
import io.mantisrx.server.master.persistence.IMantisPersistenceProvider;
import io.mantisrx.server.master.resourcecluster.ClusterID;
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.master.store.KeyValueStore;
import io.mantisrx.server.master.store.MantisJobMetadataWritable;
import io.mantisrx.server.master.store.MantisStageMetadata;
import io.mantisrx.server.master.store.MantisStageMetadataWritable;
import io.mantisrx.server.master.store.MantisWorkerMetadata;
import io.mantisrx.server.master.store.MantisWorkerMetadataWritable;
import io.mantisrx.server.master.store.NamedJob;
import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import io.mantisrx.shaded.com.fasterxml.jackson.core.type.TypeReference;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.DeserializationFeature;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.Module;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.mantisrx.shaded.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.mantisrx.shaded.com.google.common.collect.ImmutableList;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

public class KeyValueBasedPersistenceProvider
implements IMantisPersistenceProvider {
    private static final Logger logger = LoggerFactory.getLogger(KeyValueBasedPersistenceProvider.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final String JOB_STAGEDATA_NS = "MantisJobStageData";
    private static final String ARCHIVED_JOB_STAGEDATA_NS = "MantisArchivedJobStageData";
    private static final String WORKERS_NS = "MantisWorkers";
    private static final String ARCHIVED_WORKERS_NS = "MantisArchivedWorkers";
    private static final String NAMED_JOBS_NS = "MantisNamedJobs";
    private static final String NAMED_COMPLETEDJOBS_NS = "MantisNamedJobCompletedJobsV2";
    private static final String ACTIVE_ASGS_NS = "MantisActiveASGs";
    private static final String TASK_EXECUTOR_REGISTRATION = "TaskExecutorRegistration";
    private static final String DISABLE_TASK_EXECUTOR_REQUESTS = "MantisDisableTaskExecutorRequests";
    private static final String CONTROLPLANE_NS = "mantis_controlplane";
    private static final String JOB_ARTIFACTS_NS = "mantis_global_job_artifacts";
    private static final String JOB_ARTIFACTS_TO_CACHE_PER_CLUSTER_ID_NS = "mantis_global_cached_artifacts";
    private static final String JOB_METADATA_SECONDARY_KEY = "jobMetadata";
    private static final String JOB_STAGE_METADATA_SECONDARY_KEY_PREFIX = "stageMetadata";
    private static final String NAMED_JOB_SECONDARY_KEY = "jobNameInfo";
    private static final String JOB_ARTIFACTS_BY_NAME_PARTITION_KEY = "JobArtifactsByName";
    private static final int WORKER_BATCH_SIZE = 1000;
    private static final int WORKER_MAX_INDEX = 30000;
    private static final long DEFAULT_TTL_IN_MS = TimeUnit.DAYS.toMillis(90L);
    private final KeyValueStore kvStore;
    private final LifecycleEventPublisher eventPublisher;
    private final Counter noWorkersFoundCounter;
    private final Counter staleWorkersFoundCounter;
    private final Counter workersFoundCounter;
    private final Counter failedToLoadJobCounter;
    private static final String RESOURCE_CLUSTER_REGISTRATION = "ResourceClusterRegistration";
    private static final String CLUSTER_REGISTRATION_KEY = "resource_cluster_registrations";
    private static final String RESOURCE_CLUSTER_RULE_PREFIX = "ResourceClusterRulePrefix";

    public KeyValueBasedPersistenceProvider(KeyValueStore kvStore, LifecycleEventPublisher eventPublisher) {
        this.kvStore = kvStore;
        this.eventPublisher = eventPublisher;
        Metrics m = new Metrics.Builder().id("storage", new Tag[0]).addCounter("noWorkersFound").addCounter("staleWorkerFound").addCounter("workersFound").addCounter("failedToLoadJobCount").build();
        m = MetricsRegistry.getInstance().registerAndGet(m);
        this.noWorkersFoundCounter = m.getCounter("noWorkersFound");
        this.staleWorkersFoundCounter = m.getCounter("staleWorkerFound");
        this.workersFoundCounter = m.getCounter("workersFound");
        this.failedToLoadJobCounter = m.getCounter("failedToLoadJobCount");
    }

    protected String getJobMetadataFieldName() {
        return JOB_METADATA_SECONDARY_KEY;
    }

    protected String getStageMetadataFieldPrefix() {
        return JOB_STAGE_METADATA_SECONDARY_KEY_PREFIX;
    }

    protected String getJobArtifactsByNamePartitionKey() {
        return JOB_ARTIFACTS_BY_NAME_PARTITION_KEY;
    }

    protected Duration getArchiveDataTtlInMs() {
        return Duration.ofMillis(DEFAULT_TTL_IN_MS);
    }

    protected String getJobStageFieldName(int stageNum) {
        return String.format("%s-%d", this.getStageMetadataFieldPrefix(), stageNum);
    }

    protected String getJobClusterFieldName() {
        return NAMED_JOB_SECONDARY_KEY;
    }

    private boolean jobIsValid(MantisJobMetadataWritable job) {
        int numStages = job.getNumStages();
        Collection<? extends MantisStageMetadata> stageMetadata = job.getStageMetadata();
        if (stageMetadata == null) {
            logger.error("Could not find stage metadata for jobId {}", (Object)job.getJobId());
            return false;
        }
        if (stageMetadata.size() != numStages) {
            logger.error("Invalid stage metadata for job {}: stage count mismatch expected {} vs found {}", new Object[]{job.getJobId(), numStages, stageMetadata.size()});
            return false;
        }
        return true;
    }

    private MantisJobMetadataWritable readJobStageData(String namespace, String jobId) throws IOException {
        Map<String, String> rows = this.kvStore.getAll(namespace, jobId);
        if (rows == null || rows.isEmpty()) {
            return null;
        }
        return this.readJobStageData(jobId, rows);
    }

    private MantisJobMetadataWritable readJobStageData(String jobId, Map<String, String> items) throws IOException {
        String jobMetadataColumnName = this.getJobMetadataFieldName();
        AtomicReference wrapper = new AtomicReference();
        LinkedList stages = new LinkedList();
        items.forEach((k, v) -> {
            try {
                if (k != null && v != null) {
                    if (jobMetadataColumnName.equals(k)) {
                        wrapper.set(mapper.readValue(v, MantisJobMetadataWritable.class));
                    } else if (k.startsWith(this.getStageMetadataFieldPrefix())) {
                        stages.add(mapper.readValue(v, MantisStageMetadataWritable.class));
                    }
                }
            }
            catch (JsonProcessingException e) {
                logger.warn("failed to deserialize job metadata for jobId {}, column name {}", new Object[]{jobId, k, e});
            }
        });
        MantisJobMetadataWritable job = (MantisJobMetadataWritable)wrapper.get();
        if (job == null) {
            throw new IOException("No " + jobMetadataColumnName + " column found for key jobId=" + jobId);
        }
        if (stages.isEmpty()) {
            throw new IOException("No stage metadata columns with prefix " + this.getStageMetadataFieldPrefix() + " found for jobId=" + jobId);
        }
        for (MantisStageMetadataWritable msmd : stages) {
            job.addJobStageIfAbsent(msmd);
        }
        if (this.jobIsValid(job)) {
            return job;
        }
        throw new IOException(String.format("Invalid job for jobId %s", jobId));
    }

    @Override
    public void storeNewJob(IMantisJobMetadata jobMetadata) throws Exception {
        MantisJobMetadataWritable mjmw = DataFormatAdapter.convertMantisJobMetadataToMantisJobMetadataWriteable(jobMetadata);
        try {
            this.kvStore.upsert(JOB_STAGEDATA_NS, jobMetadata.getJobId().toString(), this.getJobMetadataFieldName(), mapper.writeValueAsString((Object)mjmw));
        }
        catch (IOException e) {
            throw new Exception(e);
        }
    }

    @Override
    public void updateJob(IMantisJobMetadata jobMetadata) throws Exception {
        MantisJobMetadataWritable mjmw = DataFormatAdapter.convertMantisJobMetadataToMantisJobMetadataWriteable(jobMetadata);
        this.kvStore.upsert(JOB_STAGEDATA_NS, jobMetadata.getJobId().toString(), this.getJobMetadataFieldName(), mapper.writeValueAsString((Object)mjmw));
    }

    @Override
    public void archiveJob(String jobId) throws IOException {
        Map<String, String> all = this.kvStore.getAll(JOB_STAGEDATA_NS, jobId);
        int workerMaxPartitionKey = this.workerMaxPartitionKey(this.readJobStageData(jobId, all));
        this.kvStore.upsertAll(ARCHIVED_JOB_STAGEDATA_NS, jobId, all, this.getArchiveDataTtlInMs());
        this.kvStore.deleteAll(JOB_STAGEDATA_NS, jobId);
        for (int i = 0; i < workerMaxPartitionKey; i += 1000) {
            String pkey = this.makeBucketizedPartitionKey(jobId, i);
            Map<String, String> workersData = this.kvStore.getAll(WORKERS_NS, pkey);
            this.kvStore.upsertAll(ARCHIVED_WORKERS_NS, pkey, workersData, this.getArchiveDataTtlInMs());
            this.kvStore.deleteAll(WORKERS_NS, pkey);
        }
    }

    @Override
    public void deleteJob(String jobId) throws IOException {
        MantisJobMetadataWritable jobMeta = this.readJobStageData(JOB_STAGEDATA_NS, jobId);
        if (jobMeta != null) {
            int workerMaxPartitionKey = this.workerMaxPartitionKey(jobMeta);
            this.kvStore.deleteAll(JOB_STAGEDATA_NS, jobId);
            this.rangeOperation(workerMaxPartitionKey, idx -> {
                try {
                    this.kvStore.deleteAll(WORKERS_NS, this.makeBucketizedPartitionKey(jobId, (int)idx));
                }
                catch (IOException e) {
                    logger.warn("failed to delete worker for jobId {} with index {}", new Object[]{jobId, idx, e});
                }
            });
            this.kvStore.deleteAll(ARCHIVED_JOB_STAGEDATA_NS, jobId);
            this.rangeOperation(workerMaxPartitionKey, idx -> {
                try {
                    this.kvStore.deleteAll(ARCHIVED_WORKERS_NS, this.makeBucketizedPartitionKey(jobId, (int)idx));
                }
                catch (IOException e) {
                    logger.warn("failed to delete worker for jobId {} with index {}", new Object[]{jobId, idx, e});
                }
            });
        }
    }

    @Override
    public void storeMantisStage(IMantisStageMetadata msmd) throws IOException {
        MantisStageMetadataWritable msmw = DataFormatAdapter.convertMantisStageMetadataToMantisStageMetadataWriteable(msmd);
        this.kvStore.upsert(JOB_STAGEDATA_NS, msmd.getJobId().toString(), this.getJobStageFieldName(msmd.getStageNum()), mapper.writeValueAsString((Object)msmw));
    }

    @Override
    public void updateMantisStage(IMantisStageMetadata msmd) throws IOException {
        this.storeMantisStage(msmd);
    }

    private int workerMaxPartitionKey(MantisJobMetadataWritable jobMetadata) {
        try {
            return jobMetadata.getNextWorkerNumberToUse();
        }
        catch (Exception exception) {
            return 30000;
        }
    }

    private int bucketizePartitionKey(int num) {
        num = Math.max(1, num);
        return (int)(1000.0 * Math.ceil(1.0 * (double)num / 1000.0));
    }

    private String makeBucketizedPartitionKey(String pkeyPart, int suffix) {
        int bucketized = this.bucketizePartitionKey(suffix);
        return String.format("%s-%d", pkeyPart, bucketized);
    }

    private String makeBucketizedSecondaryKey(int stageNum, int workerIdx, int workerNum) {
        return String.format("%d-%d-%d", stageNum, workerIdx, workerNum);
    }

    private void rangeOperation(int nextJobNumber, Consumer<Integer> fn) {
        int maxIndex = this.bucketizePartitionKey(nextJobNumber);
        for (int i = 1000; i <= maxIndex; i += 1000) {
            fn.accept(i);
        }
    }

    @Override
    public void storeWorker(IMantisWorkerMetadata workerMetadata) throws IOException {
        this.storeWorkers(workerMetadata.getJobId(), Collections.singletonList(workerMetadata));
    }

    @Override
    public void storeWorkers(List<IMantisWorkerMetadata> workers) throws IOException {
        for (IMantisWorkerMetadata worker : workers) {
            MantisWorkerMetadataWritable mwmw = DataFormatAdapter.convertMantisWorkerMetadataToMantisWorkerMetadataWritable(worker);
            String pkey = this.makeBucketizedPartitionKey(mwmw.getJobId(), mwmw.getWorkerNumber());
            String skey = this.makeBucketizedSecondaryKey(mwmw.getStageNum(), mwmw.getWorkerIndex(), mwmw.getWorkerNumber());
            this.kvStore.upsert(WORKERS_NS, pkey, skey, mapper.writeValueAsString((Object)mwmw));
        }
    }

    @Override
    public void storeAndUpdateWorkers(IMantisWorkerMetadata existingWorker, IMantisWorkerMetadata newWorker) throws IOException {
        this.storeWorkers((List<IMantisWorkerMetadata>)ImmutableList.of((Object)existingWorker, (Object)newWorker));
    }

    @Override
    public void updateWorker(IMantisWorkerMetadata worker) throws IOException {
        this.storeWorker(worker);
    }

    private Map<String, List<MantisWorkerMetadataWritable>> getAllWorkersByJobId(String namespace) throws IOException {
        HashMap<String, List<MantisWorkerMetadataWritable>> workersByJobId = new HashMap<String, List<MantisWorkerMetadataWritable>>();
        for (Map.Entry<String, Map<String, String>> worker : this.kvStore.getAllRows(namespace).entrySet()) {
            if (worker.getValue().values().size() <= 0) continue;
            List workers = worker.getValue().values().stream().map(data -> {
                try {
                    return (MantisWorkerMetadataWritable)mapper.readValue(data, MantisWorkerMetadataWritable.class);
                }
                catch (JsonProcessingException e) {
                    logger.warn("failed to parse worker against pkey {} json {}", new Object[]{worker.getKey(), data, e});
                    return null;
                }
            }).filter(Objects::nonNull).collect(Collectors.toList());
            workersByJobId.computeIfAbsent(((MantisWorkerMetadataWritable)workers.get(0)).getJobId(), k -> Lists.newArrayList()).addAll(workers);
        }
        return workersByJobId;
    }

    @Override
    public List<IMantisJobMetadata> loadAllJobs() throws IOException {
        logger.info("MantisStorageProviderAdapter:Enter loadAllJobs");
        Map<String, List<MantisWorkerMetadataWritable>> workersByJobId = this.getAllWorkersByJobId(WORKERS_NS);
        ArrayList jobMetas = Lists.newArrayList();
        Map<String, Map<String, String>> allRows = this.kvStore.getAllRows(JOB_STAGEDATA_NS);
        for (Map.Entry<String, Map<String, String>> jobInfo : allRows.entrySet()) {
            String jobId = jobInfo.getKey();
            try {
                MantisJobMetadataWritable jobMeta = this.readJobStageData(jobId, jobInfo.getValue());
                if (CollectionUtils.isEmpty((Collection)workersByJobId.get(jobId))) {
                    logger.warn("No workers found for job {}, skipping", (Object)jobId);
                    this.noWorkersFoundCounter.increment();
                    continue;
                }
                this.workersFoundCounter.increment();
                for (MantisWorkerMetadataWritable workerMeta : workersByJobId.get(jobId)) {
                    MantisWorkerMetadata existedWorker = jobMeta.tryAddOrReplaceWorker(workerMeta.getStageNum(), workerMeta);
                    if (existedWorker == null) continue;
                    logger.error("Encountered duplicate worker {} when adding {}", (Object)existedWorker, (Object)workerMeta);
                    this.staleWorkersFoundCounter.increment();
                }
                jobMetas.add(DataFormatAdapter.convertMantisJobWriteableToMantisJobMetadata(jobMeta, this.eventPublisher));
            }
            catch (Exception e) {
                logger.error("Exception loading job {}", (Object)jobId, (Object)e);
                this.failedToLoadJobCounter.increment();
            }
        }
        logger.info("MantisStorageProviderAdapter:Exit loadAllJobs {}", (Object)jobMetas.size());
        return jobMetas;
    }

    @Override
    public Observable<IMantisJobMetadata> loadAllArchivedJobs() {
        return Observable.create(subscriber -> {
            try {
                for (String pkey : this.kvStore.getAllPartitionKeys(ARCHIVED_JOB_STAGEDATA_NS)) {
                    Optional<IMantisJobMetadata> jobMetaOpt = this.loadArchivedJob(pkey);
                    jobMetaOpt.ifPresent(arg_0 -> ((Subscriber)subscriber).onNext(arg_0));
                }
                subscriber.onCompleted();
            }
            catch (IOException e) {
                subscriber.onError((Throwable)e);
            }
        });
    }

    @Override
    public List<IJobClusterMetadata> loadAllJobClusters() throws IOException {
        AtomicInteger failedCount = new AtomicInteger();
        AtomicInteger successCount = new AtomicInteger();
        ArrayList jobClusters = Lists.newArrayList();
        for (Map.Entry<String, Map<String, String>> rows : this.kvStore.getAllRows(NAMED_JOBS_NS).entrySet()) {
            String name = rows.getKey();
            try {
                String data = rows.getValue().get(this.getJobClusterFieldName());
                NamedJob jobCluster = this.getJobCluster(NAMED_JOBS_NS, name, data);
                jobClusters.add(DataFormatAdapter.convertNamedJobToJobClusterMetadata(jobCluster));
                successCount.getAndIncrement();
            }
            catch (Exception e) {
                logger.error("Exception {} getting job cluster for {} ", new Object[]{e.getMessage(), name, e});
                failedCount.getAndIncrement();
            }
        }
        return jobClusters;
    }

    @Override
    public void archiveWorker(IMantisWorkerMetadata mwmd) throws IOException {
        MantisWorkerMetadataWritable worker = DataFormatAdapter.convertMantisWorkerMetadataToMantisWorkerMetadataWritable(mwmd);
        String pkey = this.makeBucketizedPartitionKey(worker.getJobId(), worker.getWorkerNumber());
        String skey = this.makeBucketizedSecondaryKey(worker.getStageNum(), worker.getWorkerIndex(), worker.getWorkerNumber());
        this.kvStore.delete(WORKERS_NS, pkey, skey);
        this.kvStore.upsert(ARCHIVED_WORKERS_NS, pkey, skey, mapper.writeValueAsString((Object)worker), this.getArchiveDataTtlInMs());
    }

    @Override
    public List<IMantisWorkerMetadata> getArchivedWorkers(String jobId) throws IOException {
        MantisJobMetadataWritable jobInfo;
        try {
            jobInfo = this.readJobStageData(JOB_STAGEDATA_NS, jobId);
        }
        catch (Exception e) {
            jobInfo = this.readJobStageData(ARCHIVED_JOB_STAGEDATA_NS, jobId);
        }
        if (jobInfo == null) {
            return Collections.emptyList();
        }
        int workerMaxPartitionKey = this.workerMaxPartitionKey(jobInfo);
        ArrayList archivedWorkers = Lists.newArrayList();
        this.rangeOperation(workerMaxPartitionKey, idx -> {
            String pkey = this.makeBucketizedPartitionKey(jobId, (int)idx);
            try {
                Map<String, String> items = this.kvStore.getAll(ARCHIVED_WORKERS_NS, pkey);
                for (Map.Entry<String, String> entry : items.entrySet()) {
                    try {
                        JobWorker jobWorker = DataFormatAdapter.convertMantisWorkerMetadataWriteableToMantisWorkerMetadata((MantisWorkerMetadata)mapper.readValue(entry.getValue(), MantisWorkerMetadataWritable.class), this.eventPublisher);
                        archivedWorkers.add(jobWorker.getMetadata());
                    }
                    catch (Exception e) {
                        logger.warn("Exception converting worker for jobId {} ({}, {})", new Object[]{jobId, pkey, entry.getKey(), e});
                    }
                }
            }
            catch (IOException e) {
                logger.warn("Error reading archive workers for jobId {} for pkey {}", new Object[]{jobId, pkey, e});
            }
        });
        return archivedWorkers;
    }

    @Override
    public void createJobCluster(IJobClusterMetadata jobCluster) throws Exception {
        this.updateJobCluster(jobCluster);
    }

    @Override
    public void updateJobCluster(IJobClusterMetadata jobCluster) throws Exception {
        this.kvStore.upsert(NAMED_JOBS_NS, jobCluster.getJobClusterDefinition().getName(), this.getJobClusterFieldName(), mapper.writeValueAsString((Object)DataFormatAdapter.convertJobClusterMetadataToNamedJob(jobCluster)));
    }

    @Override
    public void deleteJobCluster(String name) throws Exception {
        this.kvStore.deleteAll(NAMED_JOBS_NS, name);
        this.kvStore.deleteAll(NAMED_COMPLETEDJOBS_NS, name);
    }

    private NamedJob getJobCluster(String namespace, String name) throws Exception {
        return this.getJobCluster(namespace, name, this.kvStore.get(namespace, name, this.getJobClusterFieldName()));
    }

    private NamedJob getJobCluster(String namespace, String name, String data) throws Exception {
        return (NamedJob)mapper.readValue(data, NamedJob.class);
    }

    private int parseJobId(String jobId) {
        int idx = jobId.lastIndexOf("-");
        return Integer.parseInt(jobId.substring(idx + 1));
    }

    @Override
    public void storeCompletedJobForCluster(String name, JobClusterDefinitionImpl.CompletedJob job) throws IOException {
        NamedJob.CompletedJob completedJob = DataFormatAdapter.convertCompletedJobToNamedJobCompletedJob(job);
        JobId jobId = JobId.fromId(job.getJobId()).get();
        this.kvStore.upsertOrdered(NAMED_COMPLETEDJOBS_NS, name, jobId.getJobNum(), mapper.writeValueAsString((Object)completedJob), this.getArchiveDataTtlInMs());
    }

    @Override
    public void deleteCompletedJobsForCluster(String name) throws IOException {
        this.kvStore.deleteAll(NAMED_COMPLETEDJOBS_NS, name);
    }

    @Override
    public List<JobClusterDefinitionImpl.CompletedJob> loadLatestCompletedJobsForCluster(String name, int limit, @Nullable JobId startJobIdExclusive) throws IOException {
        Map<Long, String> items = startJobIdExclusive != null ? this.kvStore.getAllOrdered(NAMED_COMPLETEDJOBS_NS, name, limit, startJobIdExclusive.getJobNum()) : this.kvStore.getAllOrdered(NAMED_COMPLETEDJOBS_NS, name, limit);
        return items.values().stream().map(value -> {
            try {
                return (NamedJob.CompletedJob)mapper.readValue(value, NamedJob.CompletedJob.class);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).map(DataFormatAdapter::convertNamedJobCompletedJobToCompletedJob).sorted(Comparator.comparingLong(job -> JobId.fromId(job.getJobId()).get().getJobNum()).reversed()).collect(Collectors.toList());
    }

    @Override
    public Optional<IMantisJobMetadata> loadArchivedJob(String jobId) throws IOException {
        try {
            MantisJobMetadataWritable jmw = this.readJobStageData(ARCHIVED_JOB_STAGEDATA_NS, jobId);
            if (jmw == null) {
                return Optional.empty();
            }
            return Optional.of(DataFormatAdapter.convertMantisJobWriteableToMantisJobMetadata(jmw, this.eventPublisher));
        }
        catch (Exception e) {
            logger.error("Exception loading archived job {}", (Object)jobId, (Object)e);
            return Optional.empty();
        }
    }

    @Override
    public List<String> initActiveVmAttributeValuesList() throws IOException {
        String data = this.kvStore.get(ACTIVE_ASGS_NS, "activeASGs", "thelist");
        logger.info("read active VMs data {} from Cass", (Object)data);
        if (StringUtils.isBlank((CharSequence)data)) {
            return Collections.emptyList();
        }
        return (List)mapper.readValue(data, (TypeReference)new TypeReference<List<String>>(){});
    }

    @Override
    public void setActiveVmAttributeValuesList(List<String> vmAttributesList) throws IOException {
        logger.info("Setting active ASGs {}", vmAttributesList);
        this.kvStore.upsert(ACTIVE_ASGS_NS, "activeASGs", "thelist", mapper.writeValueAsString(vmAttributesList));
    }

    @Override
    public TaskExecutorRegistration getTaskExecutorFor(TaskExecutorID taskExecutorID) throws IOException {
        try {
            String value = this.kvStore.get(CONTROLPLANE_NS, "TaskExecutorRegistration-" + taskExecutorID.getResourceId(), taskExecutorID.getResourceId());
            return (TaskExecutorRegistration)mapper.readValue(value, TaskExecutorRegistration.class);
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    @Override
    public void storeNewTaskExecutor(TaskExecutorRegistration registration) throws IOException {
        String resourceId = registration.getTaskExecutorID().getResourceId();
        String keyId = String.format("%s-%s", TASK_EXECUTOR_REGISTRATION, resourceId);
        this.kvStore.upsert(CONTROLPLANE_NS, keyId, resourceId, mapper.writeValueAsString((Object)registration));
    }

    @Override
    public void storeNewDisableTaskExecutorRequest(DisableTaskExecutorsRequest request) throws IOException {
        String data = mapper.writeValueAsString((Object)request);
        this.kvStore.upsert(DISABLE_TASK_EXECUTOR_REQUESTS, request.getClusterID().getResourceID(), request.getHash(), data);
    }

    @Override
    public void deleteExpiredDisableTaskExecutorRequest(DisableTaskExecutorsRequest request) throws IOException {
        this.kvStore.delete(DISABLE_TASK_EXECUTOR_REQUESTS, request.getClusterID().getResourceID(), request.getHash());
    }

    @Override
    public List<DisableTaskExecutorsRequest> loadAllDisableTaskExecutorsRequests(ClusterID clusterID) throws IOException {
        return this.kvStore.getAll(DISABLE_TASK_EXECUTOR_REQUESTS, clusterID.getResourceID()).values().stream().map(value -> {
            try {
                return (DisableTaskExecutorsRequest)mapper.readValue(value, DisableTaskExecutorsRequest.class);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).collect(Collectors.toList());
    }

    @Override
    public boolean isArtifactExists(String resourceId) throws IOException {
        return this.kvStore.isRowExists(JOB_ARTIFACTS_NS, resourceId, resourceId);
    }

    @Override
    public JobArtifact getArtifactById(String resourceId) throws IOException {
        String data = this.kvStore.get(JOB_ARTIFACTS_NS, resourceId, resourceId);
        return (JobArtifact)mapper.readValue(data, JobArtifact.class);
    }

    @Override
    public List<JobArtifact> listJobArtifacts(String name, String version) throws IOException {
        ImmutableList artifacts = version == null ? this.kvStore.getAll(JOB_ARTIFACTS_NS, name).values() : ImmutableList.of((Object)this.kvStore.get(JOB_ARTIFACTS_NS, name, version));
        return artifacts.stream().map(e -> {
            try {
                return (JobArtifact)mapper.readValue(e, JobArtifact.class);
            }
            catch (JsonProcessingException ex) {
                logger.warn("Failed to deserialize job artifact metadata for {} (data={})", new Object[]{name, e, ex});
                return null;
            }
        }).filter(Objects::nonNull).collect(Collectors.toList());
    }

    @Override
    public void addNewJobArtifactsToCache(ClusterID clusterID, List<ArtifactID> artifacts) throws IOException {
        for (ArtifactID artifact : artifacts) {
            this.kvStore.upsert(JOB_ARTIFACTS_TO_CACHE_PER_CLUSTER_ID_NS, clusterID.getResourceID(), artifact.getResourceID(), artifact.getResourceID());
        }
    }

    @Override
    public void removeJobArtifactsToCache(ClusterID clusterID, List<ArtifactID> artifacts) throws IOException {
        for (ArtifactID artifact : artifacts) {
            this.kvStore.delete(JOB_ARTIFACTS_TO_CACHE_PER_CLUSTER_ID_NS, clusterID.getResourceID(), artifact.getResourceID());
        }
    }

    @Override
    public List<String> listJobArtifactsToCache(ClusterID clusterID) throws IOException {
        return new ArrayList<String>(this.kvStore.getAll(JOB_ARTIFACTS_TO_CACHE_PER_CLUSTER_ID_NS, clusterID.getResourceID()).values());
    }

    @Override
    public List<String> listJobArtifactsByName(String prefix, String contains) throws IOException {
        Set<String> artifactNames = prefix.isEmpty() ? this.kvStore.getAll(JOB_ARTIFACTS_NS, this.getJobArtifactsByNamePartitionKey()).keySet() : this.kvStore.getAllWithPrefix(JOB_ARTIFACTS_NS, this.getJobArtifactsByNamePartitionKey(), prefix).keySet();
        if (!contains.isEmpty()) {
            return artifactNames.stream().filter(artifact -> artifact.toLowerCase().contains(contains.toLowerCase())).distinct().collect(Collectors.toList());
        }
        return new ArrayList<String>(artifactNames);
    }

    private void addNewJobArtifact(String partitionKey, String secondaryKey, JobArtifact jobArtifact) {
        try {
            String data = mapper.writeValueAsString((Object)jobArtifact);
            this.kvStore.upsert(JOB_ARTIFACTS_NS, partitionKey, secondaryKey, data);
        }
        catch (IOException e) {
            logger.error("Error while storing keyId {} for artifact {}", new Object[]{partitionKey, jobArtifact, e});
            throw new RuntimeException(e);
        }
    }

    @Override
    public void addNewJobArtifact(JobArtifact jobArtifact) throws IOException {
        try {
            ((CompletableFuture)((CompletableFuture)CompletableFuture.runAsync(() -> this.addNewJobArtifact(this.getJobArtifactsByNamePartitionKey(), jobArtifact.getName(), jobArtifact)).thenRunAsync(() -> this.addNewJobArtifact(jobArtifact.getName(), jobArtifact.getVersion(), jobArtifact))).thenRunAsync(() -> this.addNewJobArtifact(jobArtifact.getArtifactID().getResourceID(), jobArtifact.getArtifactID().getResourceID(), jobArtifact))).get();
        }
        catch (InterruptedException | ExecutionException e) {
            logger.error("Error while storing job artifact {} to Cassandra Storage Provider.", (Object)jobArtifact, (Object)e);
            throw new IOException(e);
        }
    }

    @Override
    public ResourceClusterSpecWritable registerAndUpdateClusterSpec(ResourceClusterSpecWritable clusterSpecW) throws IOException {
        RegisteredResourceClustersWritable oldValue = this.getRegisteredResourceClustersWritable();
        RegisteredResourceClustersWritable.RegisteredResourceClustersWritableBuilder rcBuilder = oldValue == null ? RegisteredResourceClustersWritable.builder() : oldValue.toBuilder();
        RegisteredResourceClustersWritable newValue = rcBuilder.cluster(clusterSpecW.getId().getResourceID(), RegisteredResourceClustersWritable.ClusterRegistration.builder().clusterId(clusterSpecW.getId()).version(clusterSpecW.getVersion()).build()).build();
        this.kvStore.upsert(CONTROLPLANE_NS, KeyValueBasedPersistenceProvider.getClusterKeyFromId(clusterSpecW.getId()), "", mapper.writeValueAsString((Object)clusterSpecW));
        this.kvStore.upsert(CONTROLPLANE_NS, CLUSTER_REGISTRATION_KEY, "", mapper.writeValueAsString((Object)newValue));
        return this.getResourceClusterSpecWritable(clusterSpecW.getId());
    }

    @Override
    public RegisteredResourceClustersWritable deregisterCluster(ClusterID clusterId) throws IOException {
        RegisteredResourceClustersWritable oldValue = this.getRegisteredResourceClustersWritable();
        RegisteredResourceClustersWritable.RegisteredResourceClustersWritableBuilder rcBuilder = RegisteredResourceClustersWritable.builder();
        oldValue.getClusters().entrySet().stream().filter(kv -> !Objects.equals(clusterId.getResourceID(), kv.getKey())).forEach(kv -> rcBuilder.cluster((String)kv.getKey(), (RegisteredResourceClustersWritable.ClusterRegistration)kv.getValue()));
        RegisteredResourceClustersWritable newValue = rcBuilder.build();
        this.kvStore.upsert(CONTROLPLANE_NS, CLUSTER_REGISTRATION_KEY, "", mapper.writeValueAsString((Object)newValue));
        this.kvStore.delete(CONTROLPLANE_NS, KeyValueBasedPersistenceProvider.getClusterKeyFromId(clusterId), "");
        return newValue;
    }

    @Override
    public RegisteredResourceClustersWritable getRegisteredResourceClustersWritable() throws IOException {
        String values = this.kvStore.get(CONTROLPLANE_NS, CLUSTER_REGISTRATION_KEY, "");
        if (values == null) {
            return RegisteredResourceClustersWritable.builder().build();
        }
        return (RegisteredResourceClustersWritable)mapper.readValue(values, RegisteredResourceClustersWritable.class);
    }

    @Override
    public ResourceClusterSpecWritable getResourceClusterSpecWritable(ClusterID clusterID) throws IOException {
        String result = this.kvStore.get(CONTROLPLANE_NS, KeyValueBasedPersistenceProvider.getClusterKeyFromId(clusterID), "");
        if (result == null) {
            return null;
        }
        return (ResourceClusterSpecWritable)mapper.readValue(result, ResourceClusterSpecWritable.class);
    }

    @Override
    public ResourceClusterScaleRulesWritable getResourceClusterScaleRules(ClusterID clusterId) throws IOException {
        String res = this.kvStore.get(CONTROLPLANE_NS, KeyValueBasedPersistenceProvider.getClusterRuleKeyFromId(clusterId), "");
        if (res == null) {
            return ResourceClusterScaleRulesWritable.builder().clusterId(clusterId).build();
        }
        return (ResourceClusterScaleRulesWritable)mapper.readValue(res, ResourceClusterScaleRulesWritable.class);
    }

    @Override
    public ResourceClusterScaleRulesWritable registerResourceClusterScaleRule(ResourceClusterScaleRulesWritable ruleSpec) throws IOException {
        this.kvStore.upsert(CONTROLPLANE_NS, KeyValueBasedPersistenceProvider.getClusterRuleKeyFromId(ruleSpec.getClusterId()), "", mapper.writeValueAsString((Object)ruleSpec));
        return this.getResourceClusterScaleRules(ruleSpec.getClusterId());
    }

    @Override
    public ResourceClusterScaleRulesWritable registerResourceClusterScaleRule(ResourceClusterScaleSpec rule) throws IOException {
        ResourceClusterScaleRulesWritable existing = this.getResourceClusterScaleRules(rule.getClusterId());
        ResourceClusterScaleRulesWritable newSpec = existing == null ? ResourceClusterScaleRulesWritable.builder().clusterId(rule.getClusterId()).scaleRule(rule.getSkuId().getResourceID(), rule).build() : existing.toBuilder().scaleRule(rule.getSkuId().getResourceID(), rule).build();
        return this.registerResourceClusterScaleRule(newSpec);
    }

    private static String getClusterKeyFromId(ClusterID id) {
        return "ResourceClusterRegistration_" + id.getResourceID();
    }

    private static String getClusterRuleKeyFromId(ClusterID id) {
        return "ResourceClusterRulePrefix_" + id.getResourceID();
    }

    static {
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).registerModule((Module)new Jdk8Module()).registerModule((Module)new JavaTimeModule());
    }
}

