/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.styx.storage;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.cloud.Timestamp;
import com.google.cloud.datastore.DatastoreException;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.EntityQuery;
import com.google.cloud.datastore.Key;
import com.google.cloud.datastore.KeyFactory;
import com.google.cloud.datastore.PathElement;
import com.google.cloud.datastore.Query;
import com.google.cloud.datastore.StringValue;
import com.google.cloud.datastore.StructuredQuery;
import com.google.cloud.datastore.Value;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import com.google.common.io.Closer;
import com.spotify.styx.model.Backfill;
import com.spotify.styx.model.BackfillBuilder;
import com.spotify.styx.model.ExecutionDescription;
import com.spotify.styx.model.Resource;
import com.spotify.styx.model.Schedule;
import com.spotify.styx.model.StyxConfig;
import com.spotify.styx.model.TriggerParameters;
import com.spotify.styx.model.Workflow;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.model.WorkflowState;
import com.spotify.styx.serialization.Json;
import com.spotify.styx.state.Message;
import com.spotify.styx.state.RunState;
import com.spotify.styx.state.StateData;
import com.spotify.styx.storage.CheckedDatastore;
import com.spotify.styx.storage.CheckedDatastoreReaderWriter;
import com.spotify.styx.storage.CheckedDatastoreTransaction;
import com.spotify.styx.storage.DatastoreIOException;
import com.spotify.styx.storage.DatastoreStorageTransaction;
import com.spotify.styx.storage.IOOperation;
import com.spotify.styx.storage.StorageTransaction;
import com.spotify.styx.storage.TransactionException;
import com.spotify.styx.storage.TransactionFunction;
import com.spotify.styx.util.CloserUtil;
import com.spotify.styx.util.FnWithException;
import com.spotify.styx.util.FutureUtil;
import com.spotify.styx.util.MDCUtil;
import com.spotify.styx.util.ResourceNotFoundException;
import com.spotify.styx.util.TimeUtil;
import com.spotify.styx.util.TriggerInstantSpec;
import com.spotify.styx.util.TriggerUtil;
import io.grpc.Context;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatastoreStorage
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(DatastoreStorage.class);
    public static final String KIND_STYX_CONFIG = "StyxConfig";
    public static final String KIND_COMPONENT = "Component";
    public static final String KIND_WORKFLOW = "Workflow";
    public static final String KIND_ACTIVE_WORKFLOW_INSTANCE = "ActiveWorkflowInstance";
    public static final String KIND_ACTIVE_WORKFLOW_INSTANCE_INDEX_SHARD = "ActiveWorkflowInstanceIndexShard";
    public static final String KIND_ACTIVE_WORKFLOW_INSTANCE_INDEX_SHARD_ENTRY = "ActiveWorkflowInstanceIndexShardEntry";
    public static final String KIND_BACKFILL = "Backfill";
    public static final String PROPERTY_CONFIG_ENABLED = "enabled";
    public static final String PROPERTY_CONFIG_DOCKER_RUNNER_ID = "dockerRunnerId";
    public static final String PROPERTY_CONFIG_CONCURRENCY = "concurrency";
    public static final String PROPERTY_CONFIG_CLIENT_BLACKLIST = "clientBlacklist";
    public static final String PROPERTY_CONFIG_EXECUTION_GATING_ENABLED = "executionGatingEnabled";
    public static final String PROPERTY_CONFIG_DEBUG_ENABLED = "debug";
    public static final String PROPERTY_WORKFLOW_JSON = "json";
    public static final String PROPERTY_WORKFLOW_ENABLED = "enabled";
    public static final String PROPERTY_NEXT_NATURAL_TRIGGER = "nextNaturalTrigger";
    public static final String PROPERTY_NEXT_NATURAL_OFFSET_TRIGGER = "nextNaturalOffsetTrigger";
    public static final String PROPERTY_COUNTER = "counter";
    public static final String PROPERTY_COMPONENT = "component";
    public static final String PROPERTY_WORKFLOW = "workflow";
    public static final String PROPERTY_PARAMETER = "parameter";
    public static final String PROPERTY_CONCURRENCY = "concurrency";
    public static final String PROPERTY_START = "start";
    public static final String PROPERTY_END = "end";
    public static final String PROPERTY_NEXT_TRIGGER = "nextTrigger";
    public static final String PROPERTY_SCHEDULE = "schedule";
    public static final String PROPERTY_ALL_TRIGGERED = "allTriggered";
    public static final String PROPERTY_HALTED = "halted";
    public static final String PROPERTY_REVERSE = "reverse";
    public static final String PROPERTY_DESCRIPTION = "description";
    public static final String PROPERTY_TRIGGER_PARAMETERS = "triggerParameters";
    public static final String PROPERTY_SUBMISSION_RATE_LIMIT = "submissionRateLimit";
    public static final String PROPERTY_STATE = "state";
    public static final String PROPERTY_STATE_TIMESTAMP = "stateTimestamp";
    public static final String PROPERTY_STATE_TRIGGER_TYPE = "triggerType";
    public static final String PROPERTY_STATE_TRIGGER_ID = "triggerId";
    public static final String PROPERTY_STATE_TRIES = "tries";
    public static final String PROPERTY_STATE_CONSECUTIVE_FAILURES = "consecutiveFailures";
    public static final String PROPERTY_STATE_RETRY_COST = "retryCost";
    public static final String PROPERTY_STATE_MESSAGES = "messages";
    public static final String PROPERTY_STATE_RETRY_DELAY_MILLIS = "retryDelayMillis";
    public static final String PROPERTY_STATE_LAST_EXIT = "lastExit";
    public static final String PROPERTY_STATE_EXECUTION_ID = "executionId";
    public static final String PROPERTY_STATE_EXECUTION_DESCRIPTION = "executionDescription";
    public static final String PROPERTY_STATE_RESOURCE_IDS = "resourceIds";
    public static final String PROPERTY_STATE_TRIGGER_PARAMETERS = "triggerParameters";
    public static final String KEY_GLOBAL_CONFIG = "styxGlobal";
    public static final boolean DEFAULT_CONFIG_ENABLED = true;
    public static final String DEFAULT_CONFIG_DOCKER_RUNNER_ID = "default";
    public static final boolean DEFAULT_WORKFLOW_ENABLED = false;
    public static final boolean DEFAULT_CONFIG_DEBUG_ENABLED = false;
    public static final boolean DEFAULT_CONFIG_EXECUTION_GATING_ENABLED = false;
    private static final boolean DEFAULT_CONFIG_BOOTSTRAP_ACTIVE_WFI_ENABLED = false;
    public static final int ACTIVE_WORKFLOW_INSTANCE_INDEX_SHARDS = 128;
    public static final int MAX_RETRIES = 100;
    public static final int MAX_NUMBER_OF_ENTITIES_IN_ONE_BATCH_READ = 1000;
    public static final int MAX_NUMBER_OF_ENTITIES_IN_ONE_BATCH_WRITE = 500;
    private static final int REQUEST_CONCURRENCY = 32;
    private final Closer closer = Closer.create();
    private final CheckedDatastore datastore;
    private final Duration retryBaseDelay;
    private final Function<CheckedDatastoreTransaction, DatastoreStorageTransaction> storageTransactionFactory;
    private final Executor executor;

    DatastoreStorage(CheckedDatastore datastore, Duration retryBaseDelay) {
        this(datastore, retryBaseDelay, DatastoreStorageTransaction::new);
    }

    @VisibleForTesting
    DatastoreStorage(CheckedDatastore datastore, Duration retryBaseDelay, Function<CheckedDatastoreTransaction, DatastoreStorageTransaction> storageTransactionFactory) {
        this.datastore = Objects.requireNonNull(datastore);
        this.retryBaseDelay = Objects.requireNonNull(retryBaseDelay);
        this.storageTransactionFactory = Objects.requireNonNull(storageTransactionFactory);
        ForkJoinPool forkJoinPool = CloserUtil.register(this.closer, new ForkJoinPool(32), "datastore-storage");
        this.executor = MDCUtil.withMDC(Context.currentContextExecutor((Executor)forkJoinPool));
    }

    @Override
    public void close() throws IOException {
        this.closer.close();
    }

    StyxConfig config() throws IOException {
        Entity entity = DatastoreStorage.asBuilderOrNew(DatastoreStorage.getOpt(this.datastore, DatastoreStorage.globalConfigKey(this.datastore.newKeyFactory())), DatastoreStorage.globalConfigKey(this.datastore.newKeyFactory())).build();
        return this.entityToConfig(entity);
    }

    private StyxConfig entityToConfig(Entity entity) {
        return StyxConfig.newBuilder().globalConcurrency(DatastoreStorage.readOpt(entity, "concurrency")).globalEnabled(DatastoreStorage.read(entity, "enabled", true)).debugEnabled(DatastoreStorage.read(entity, PROPERTY_CONFIG_DEBUG_ENABLED, false)).submissionRateLimit(DatastoreStorage.readOpt(entity, PROPERTY_SUBMISSION_RATE_LIMIT)).globalDockerRunnerId(DatastoreStorage.read(entity, PROPERTY_CONFIG_DOCKER_RUNNER_ID, DEFAULT_CONFIG_DOCKER_RUNNER_ID)).clientBlacklist(this.readStream(entity, PROPERTY_CONFIG_CLIENT_BLACKLIST).collect(Collectors.toList())).executionGatingEnabled(DatastoreStorage.read(entity, PROPERTY_CONFIG_EXECUTION_GATING_ENABLED, false)).build();
    }

    boolean enabled(WorkflowId workflowId) throws IOException {
        Key workflowKey = DatastoreStorage.workflowKey(this.datastore.newKeyFactory(), workflowId);
        return DatastoreStorage.getOpt(this.datastore, workflowKey).filter(w -> w.contains("enabled")).map(workflow -> workflow.getBoolean("enabled")).orElse(false);
    }

    Set<WorkflowId> enabled() throws IOException {
        EntityQuery queryWorkflows = ((EntityQuery.Builder)EntityQuery.newEntityQueryBuilder().setKind(KIND_WORKFLOW)).build();
        HashSet enabledWorkflows = Sets.newHashSet();
        this.datastore.query(queryWorkflows, workflow -> {
            boolean enabled;
            boolean bl = enabled = workflow.contains("enabled") && workflow.getBoolean("enabled");
            if (enabled) {
                enabledWorkflows.add(this.parseWorkflowId((Entity)workflow));
            }
        });
        return enabledWorkflows;
    }

    void store(Workflow workflow) throws IOException {
        this.storeWithRetries(() -> this.runInTransaction(tx -> tx.store(workflow)));
    }

    Optional<Workflow> workflow(WorkflowId workflowId) throws IOException {
        Optional<Entity> entityOptional = DatastoreStorage.getOpt(this.datastore, DatastoreStorage.workflowKey(this.datastore.newKeyFactory(), workflowId)).filter(e -> e.contains(PROPERTY_WORKFLOW_JSON));
        if (entityOptional.isPresent()) {
            return Optional.of(DatastoreStorage.parseWorkflowJson(entityOptional.get(), workflowId));
        }
        return Optional.empty();
    }

    void delete(WorkflowId workflowId) throws IOException {
        this.storeWithRetries(() -> {
            this.datastore.delete(DatastoreStorage.workflowKey(this.datastore.newKeyFactory(), workflowId));
            return null;
        });
    }

    public void updateNextNaturalTrigger(WorkflowId workflowId, TriggerInstantSpec triggerSpec) throws IOException {
        this.storeWithRetries(() -> this.runInTransaction(tx -> tx.updateNextNaturalTrigger(workflowId, triggerSpec)));
    }

    public Map<Workflow, TriggerInstantSpec> workflowsWithNextNaturalTrigger() throws IOException {
        HashMap map = Maps.newHashMap();
        EntityQuery query = ((EntityQuery.Builder)Query.newEntityQueryBuilder().setKind(KIND_WORKFLOW)).build();
        this.datastore.query(query, entity -> {
            Workflow workflow;
            try {
                workflow = (Workflow)Json.OBJECT_MAPPER.readValue(entity.getString(PROPERTY_WORKFLOW_JSON), Workflow.class);
            }
            catch (IOException e) {
                LOG.warn("Failed to read workflow {}.", (Object)entity.getKey(), (Object)e);
                return;
            }
            if (entity.contains(PROPERTY_NEXT_NATURAL_TRIGGER)) {
                Instant triggerInstant;
                Instant instant = DatastoreStorage.timestampToInstant(entity.getTimestamp(PROPERTY_NEXT_NATURAL_TRIGGER));
                if (!entity.contains(PROPERTY_NEXT_NATURAL_OFFSET_TRIGGER)) {
                    Schedule schedule = workflow.configuration().schedule();
                    if (TimeUtil.isAligned(instant, schedule)) {
                        instant = TimeUtil.previousInstant(instant, schedule);
                    }
                    triggerInstant = workflow.configuration().addOffset(instant);
                } else {
                    triggerInstant = DatastoreStorage.timestampToInstant(entity.getTimestamp(PROPERTY_NEXT_NATURAL_OFFSET_TRIGGER));
                }
                map.put(workflow, TriggerInstantSpec.create(instant, triggerInstant));
            }
        });
        return map;
    }

    public Map<WorkflowId, Workflow> workflows() throws IOException {
        HashMap map = Maps.newHashMap();
        EntityQuery query = ((EntityQuery.Builder)Query.newEntityQueryBuilder().setKind(KIND_WORKFLOW)).build();
        this.datastore.query(query, entity -> {
            Workflow workflow;
            try {
                workflow = (Workflow)Json.OBJECT_MAPPER.readValue(entity.getString(PROPERTY_WORKFLOW_JSON), Workflow.class);
            }
            catch (IOException e) {
                LOG.warn("Failed to read workflow {}.", (Object)entity.getKey(), (Object)e);
                return;
            }
            map.put(workflow.id(), workflow);
        });
        return map;
    }

    public Map<WorkflowId, Workflow> workflows(Set<WorkflowId> workflowIds) {
        Iterable batches = Iterables.partition(workflowIds, (int)1000);
        return StreamSupport.stream(batches.spliterator(), false).map(batch -> this.asyncIO(() -> this.getBatchOfWorkflows((List<WorkflowId>)batch))).collect(Collectors.toList()).stream().flatMap(task -> ((List)task.join()).stream()).collect(Collectors.toMap(Workflow::id, Function.identity()));
    }

    private List<Workflow> getBatchOfWorkflows(List<WorkflowId> batch) throws IOException {
        List<Key> keys = batch.stream().map(workflowId -> DatastoreStorage.workflowKey(this.datastore.newKeyFactory(), workflowId)).collect(Collectors.toList());
        ArrayList<Workflow> workflows = new ArrayList<Workflow>();
        this.datastore.get(keys, entity -> {
            try {
                workflows.add((Workflow)Json.OBJECT_MAPPER.readValue(entity.getString(PROPERTY_WORKFLOW_JSON), Workflow.class));
            }
            catch (IOException e) {
                LOG.warn("Failed to read workflow {}.", (Object)entity.getKey(), (Object)e);
            }
        });
        return workflows;
    }

    public List<Workflow> workflows(String componentId) throws IOException {
        Key componentKey = DatastoreStorage.componentKey(this.datastore.newKeyFactory(), componentId);
        ArrayList workflows = Lists.newArrayList();
        EntityQuery query = ((EntityQuery.Builder)((EntityQuery.Builder)Query.newEntityQueryBuilder().setKind(KIND_WORKFLOW)).setFilter((StructuredQuery.Filter)StructuredQuery.PropertyFilter.hasAncestor((Key)componentKey))).build();
        this.datastore.query(query, entity -> {
            if (entity.contains(PROPERTY_WORKFLOW_JSON)) {
                Workflow workflow;
                try {
                    workflow = (Workflow)Json.OBJECT_MAPPER.readValue(entity.getString(PROPERTY_WORKFLOW_JSON), Workflow.class);
                }
                catch (IOException e) {
                    LOG.warn("Failed to read workflow {}.", (Object)entity.getKey(), (Object)e);
                    return;
                }
                workflows.add(workflow);
            }
        });
        return workflows;
    }

    Map<WorkflowInstance, RunState> readActiveStates() throws IOException {
        List keys = FutureUtil.gatherIO(DatastoreStorage.activeWorkflowInstanceIndexShardKeys(this.datastore.newKeyFactory()).stream().map(key -> this.asyncIO(() -> this.datastore.query(((EntityQuery.Builder)((EntityQuery.Builder)Query.newEntityQueryBuilder().setFilter((StructuredQuery.Filter)StructuredQuery.PropertyFilter.hasAncestor((Key)key))).setKind(KIND_ACTIVE_WORKFLOW_INSTANCE_INDEX_SHARD_ENTRY)).build()))).collect(Collectors.toList()), 30L, TimeUnit.SECONDS).stream().flatMap(Collection::stream).map(entity -> ((Key)entity.getKey()).getName()).map(name -> DatastoreStorage.activeWorkflowInstanceKey(this.datastore.newKeyFactory(), name)).collect(Collectors.toList());
        return FutureUtil.gatherIO(Lists.partition(keys, (int)1000).stream().map(batch -> this.asyncIO(() -> this.readRunStateBatch((List<Key>)batch))).collect(Collectors.toList()), 30L, TimeUnit.SECONDS).stream().flatMap(Collection::stream).collect(Collectors.toMap(RunState::workflowInstance, Function.identity()));
    }

    private List<RunState> readRunStateBatch(List<Key> keys) throws IOException {
        assert (keys.size() <= 1000);
        ArrayList<RunState> runStates = new ArrayList<RunState>();
        this.datastore.get(keys, entity -> runStates.add(DatastoreStorage.entityToRunState(entity, this.parseWorkflowInstance((Entity)entity))));
        return runStates;
    }

    Map<WorkflowInstance, RunState> readActiveStates(String componentId) throws IOException {
        EntityQuery query = ((EntityQuery.Builder)((EntityQuery.Builder)Query.newEntityQueryBuilder().setKind(KIND_ACTIVE_WORKFLOW_INSTANCE)).setFilter((StructuredQuery.Filter)StructuredQuery.PropertyFilter.eq((String)PROPERTY_COMPONENT, (String)componentId))).build();
        return this.queryActiveStates(query);
    }

    public Map<WorkflowInstance, RunState> activeStatesByTriggerId(String triggerId) throws IOException {
        EntityQuery query = ((EntityQuery.Builder)((EntityQuery.Builder)Query.newEntityQueryBuilder().setKind(KIND_ACTIVE_WORKFLOW_INSTANCE)).setFilter((StructuredQuery.Filter)StructuredQuery.PropertyFilter.eq((String)PROPERTY_STATE_TRIGGER_ID, (String)triggerId))).build();
        return this.queryActiveStates(query);
    }

    private Map<WorkflowInstance, RunState> queryActiveStates(EntityQuery activeStatesQuery) throws IOException {
        ImmutableMap.Builder mapBuilder = ImmutableMap.builder();
        this.datastore.query(activeStatesQuery, entity -> {
            WorkflowInstance instance = this.parseWorkflowInstance((Entity)entity);
            mapBuilder.put((Object)instance, (Object)DatastoreStorage.entityToRunState(entity, instance));
        });
        return mapBuilder.build();
    }

    Optional<RunState> readActiveState(WorkflowInstance instance) throws IOException {
        Entity entity = this.datastore.get(this.activeWorkflowInstanceKey(instance));
        if (entity == null) {
            return Optional.empty();
        }
        return Optional.of(DatastoreStorage.entityToRunState(entity, instance));
    }

    static RunState entityToRunState(Entity entity, WorkflowInstance instance) throws IOException {
        long counter = entity.getLong(PROPERTY_COUNTER);
        RunState.State state = RunState.State.valueOf(entity.getString(PROPERTY_STATE));
        long timestamp = entity.getLong(PROPERTY_STATE_TIMESTAMP);
        StateData data = StateData.newBuilder().tries((int)entity.getLong(PROPERTY_STATE_TRIES)).consecutiveFailures((int)entity.getLong(PROPERTY_STATE_CONSECUTIVE_FAILURES)).retryCost(entity.getDouble(PROPERTY_STATE_RETRY_COST)).trigger(DatastoreStorage.readOpt(entity, PROPERTY_STATE_TRIGGER_TYPE).map(type -> TriggerUtil.trigger(type, entity.getString(PROPERTY_STATE_TRIGGER_ID)))).messages((List)Json.OBJECT_MAPPER.readValue(entity.getString(PROPERTY_STATE_MESSAGES), (TypeReference)new TypeReference<List<Message>>(){})).retryDelayMillis(DatastoreStorage.readOpt(entity, PROPERTY_STATE_RETRY_DELAY_MILLIS)).lastExit(DatastoreStorage.readOpt(entity, PROPERTY_STATE_LAST_EXIT).map(Long::intValue)).executionId(DatastoreStorage.readOpt(entity, PROPERTY_STATE_EXECUTION_ID)).executionDescription(DatastoreStorage.readOptJson(entity, PROPERTY_STATE_EXECUTION_DESCRIPTION, ExecutionDescription.class)).resourceIds(DatastoreStorage.readOptJson(entity, PROPERTY_STATE_RESOURCE_IDS, (TypeReference)new TypeReference<Set<String>>(){})).triggerParameters(DatastoreStorage.readOptJson(entity, "triggerParameters", TriggerParameters.class)).build();
        return RunState.create(instance, state, data, Instant.ofEpochMilli(timestamp), counter);
    }

    WorkflowInstance writeActiveState(WorkflowInstance workflowInstance, RunState state) throws IOException {
        return this.storeWithRetries(() -> this.runInTransaction(tx -> tx.writeActiveState(workflowInstance, state)));
    }

    static List<Key> activeWorkflowInstanceIndexShardKeys(KeyFactory keyFactory) {
        return IntStream.range(0, 128).mapToObj(DatastoreStorage::activeWorkflowInstanceIndexShardName).map(name -> ((KeyFactory)keyFactory.setKind(KIND_ACTIVE_WORKFLOW_INSTANCE_INDEX_SHARD)).newKey(name)).collect(Collectors.toList());
    }

    private static String activeWorkflowInstanceIndexShardName(String workflowInstanceKey) {
        long hash = Hashing.murmur3_32().hashString((CharSequence)workflowInstanceKey, StandardCharsets.UTF_8).asInt();
        long index = Long.remainderUnsigned(hash, 128L);
        return DatastoreStorage.activeWorkflowInstanceIndexShardName(index);
    }

    private static String activeWorkflowInstanceIndexShardName(long index) {
        return "shard-" + index;
    }

    static Key activeWorkflowInstanceIndexShardEntryKey(KeyFactory keyFactory, WorkflowInstance workflowInstance) {
        String workflowInstanceKey = workflowInstance.toKey();
        return DatastoreStorage.activeWorkflowInstanceIndexShardEntryKey(keyFactory, workflowInstanceKey);
    }

    private static Key activeWorkflowInstanceIndexShardEntryKey(KeyFactory keyFactory, String workflowInstanceKey) {
        return ((KeyFactory)((KeyFactory)keyFactory.setKind(KIND_ACTIVE_WORKFLOW_INSTANCE_INDEX_SHARD_ENTRY)).addAncestor(PathElement.of((String)KIND_ACTIVE_WORKFLOW_INSTANCE_INDEX_SHARD, (String)DatastoreStorage.activeWorkflowInstanceIndexShardName(workflowInstanceKey)))).newKey(workflowInstanceKey);
    }

    static Entity runStateToEntity(KeyFactory keyFactory, WorkflowInstance wfi, RunState state) throws JsonProcessingException {
        Key key = DatastoreStorage.activeWorkflowInstanceKey(keyFactory, wfi);
        Entity.Builder entity = (Entity.Builder)((Entity.Builder)((Entity.Builder)((Entity.Builder)Entity.newBuilder((Key)key).set(PROPERTY_COMPONENT, wfi.workflowId().componentId())).set(PROPERTY_WORKFLOW, wfi.workflowId().id())).set(PROPERTY_PARAMETER, wfi.parameter())).set(PROPERTY_COUNTER, state.counter());
        ((Entity.Builder)((Entity.Builder)((Entity.Builder)((Entity.Builder)((Entity.Builder)entity.set(PROPERTY_STATE, state.state().toString())).set(PROPERTY_STATE_TIMESTAMP, state.timestamp())).set(PROPERTY_STATE_TRIES, (long)state.data().tries())).set(PROPERTY_STATE_CONSECUTIVE_FAILURES, (long)state.data().consecutiveFailures())).set(PROPERTY_STATE_RETRY_COST, state.data().retryCost())).set(PROPERTY_STATE_MESSAGES, (Value)DatastoreStorage.jsonValue(state.data().messages()));
        state.data().retryDelayMillis().ifPresent(v -> {
            Entity.Builder cfr_ignored_0 = (Entity.Builder)entity.set(PROPERTY_STATE_RETRY_DELAY_MILLIS, v.longValue());
        });
        state.data().lastExit().ifPresent(v -> {
            Entity.Builder cfr_ignored_0 = (Entity.Builder)entity.set(PROPERTY_STATE_LAST_EXIT, (long)v.intValue());
        });
        state.data().trigger().ifPresent(trigger -> {
            entity.set(PROPERTY_STATE_TRIGGER_TYPE, TriggerUtil.triggerType(trigger));
            entity.set(PROPERTY_STATE_TRIGGER_ID, TriggerUtil.triggerId(trigger));
        });
        state.data().executionId().ifPresent(v -> {
            Entity.Builder cfr_ignored_0 = (Entity.Builder)entity.set(PROPERTY_STATE_EXECUTION_ID, v);
        });
        if (state.data().triggerParameters().isPresent()) {
            entity.set("triggerParameters", (Value)DatastoreStorage.jsonValue(state.data().triggerParameters().get()));
        }
        if (state.data().executionDescription().isPresent()) {
            entity.set(PROPERTY_STATE_EXECUTION_DESCRIPTION, (Value)DatastoreStorage.jsonValue(state.data().executionDescription().get()));
        }
        if (state.data().resourceIds().isPresent()) {
            entity.set(PROPERTY_STATE_RESOURCE_IDS, (Value)DatastoreStorage.jsonValue(state.data().resourceIds().get()));
        }
        return entity.build();
    }

    void deleteActiveState(WorkflowInstance workflowInstance) throws IOException {
        this.storeWithRetries(() -> this.runInTransaction(tx -> tx.deleteActiveState(workflowInstance)));
    }

    void patchState(WorkflowId workflowId, WorkflowState state) throws IOException {
        this.storeWithRetries(() -> this.runInTransaction(tx -> tx.patchState(workflowId, state)));
    }

    public WorkflowState workflowState(WorkflowId workflowId) throws IOException {
        WorkflowState.Builder builder = WorkflowState.builder();
        Optional<Entity> workflowEntity = DatastoreStorage.getOpt(this.datastore, DatastoreStorage.workflowKey(this.datastore.newKeyFactory(), workflowId));
        builder.enabled(workflowEntity.filter(w -> w.contains("enabled")).map(workflow -> workflow.getBoolean("enabled")).orElse(false));
        DatastoreStorage.getOptInstantProperty(workflowEntity, PROPERTY_NEXT_NATURAL_TRIGGER).ifPresent(builder::nextNaturalTrigger);
        DatastoreStorage.getOptInstantProperty(workflowEntity, PROPERTY_NEXT_NATURAL_OFFSET_TRIGGER).ifPresent(builder::nextNaturalOffsetTrigger);
        return builder.build();
    }

    private <T> T storeWithRetries(FnWithException<T, IOException> storingOperation) throws IOException {
        int storeRetries = 0;
        while (storeRetries < 100) {
            try {
                return storingOperation.apply();
            }
            catch (ResourceNotFoundException e) {
                throw e;
            }
            catch (DatastoreException | IOException e) {
                if (e.getCause() instanceof ResourceNotFoundException) {
                    throw (ResourceNotFoundException)e.getCause();
                }
                if (++storeRetries == 100) {
                    throw e;
                }
                LOG.warn(String.format("Failed to read/write from/to Datastore (attempt #%d)", storeRetries), e);
                try {
                    Thread.sleep(this.retryBaseDelay.toMillis());
                }
                catch (InterruptedException e1) {
                    throw Throwables.propagate((Throwable)e1);
                }
            }
        }
        throw new IOException("This should never happen");
    }

    private Key activeWorkflowInstanceKey(WorkflowInstance workflowInstance) {
        return DatastoreStorage.activeWorkflowInstanceKey(this.datastore.newKeyFactory(), workflowInstance);
    }

    static Key activeWorkflowInstanceKey(KeyFactory keyFactory, WorkflowInstance workflowInstance) {
        String name = workflowInstance.toKey();
        return DatastoreStorage.activeWorkflowInstanceKey(keyFactory, name);
    }

    private static Key activeWorkflowInstanceKey(KeyFactory keyFactory, String name) {
        return ((KeyFactory)keyFactory.setKind(KIND_ACTIVE_WORKFLOW_INSTANCE)).newKey(name);
    }

    private WorkflowInstance parseWorkflowInstance(Entity activeWorkflowInstance) {
        String componentId = activeWorkflowInstance.getString(PROPERTY_COMPONENT);
        String workflowId = activeWorkflowInstance.getString(PROPERTY_WORKFLOW);
        String parameter = activeWorkflowInstance.getString(PROPERTY_PARAMETER);
        return WorkflowInstance.create(WorkflowId.create(componentId, workflowId), parameter);
    }

    private WorkflowId parseWorkflowId(Entity workflow) {
        String componentId = ((PathElement)((Key)workflow.getKey()).getAncestors().get(0)).getName();
        String id = ((Key)workflow.getKey()).getName();
        return WorkflowId.create(componentId, id);
    }

    static Workflow parseWorkflowJson(Entity entity, WorkflowId workflowId) throws IOException {
        try {
            return (Workflow)Json.OBJECT_MAPPER.readValue(entity.getString(PROPERTY_WORKFLOW_JSON), Workflow.class);
        }
        catch (IOException e) {
            LOG.error("Failed to read workflow for {}, {}", new Object[]{workflowId.componentId(), workflowId.id(), e});
            throw e;
        }
    }

    static Optional<Entity> getOpt(CheckedDatastoreReaderWriter datastore, Key key) throws IOException {
        return Optional.ofNullable(datastore.get(key));
    }

    static Optional<Instant> getOptInstantProperty(Optional<Entity> entity, String property) {
        return entity.filter(w -> w.contains(property)).map(workflow -> DatastoreStorage.timestampToInstant(workflow.getTimestamp(property)));
    }

    static Entity.Builder asBuilderOrNew(Optional<Entity> entityOpt, Key key) {
        return entityOpt.map(c -> Entity.newBuilder((Entity)c)).orElse(Entity.newBuilder((Key)key));
    }

    static Key workflowKey(KeyFactory keyFactory, WorkflowId workflowId) {
        return ((KeyFactory)((KeyFactory)keyFactory.addAncestor(PathElement.of((String)KIND_COMPONENT, (String)workflowId.componentId()))).setKind(KIND_WORKFLOW)).newKey(workflowId.id());
    }

    static Key componentKey(KeyFactory keyFactory, String componentId) {
        return ((KeyFactory)keyFactory.setKind(KIND_COMPONENT)).newKey(componentId);
    }

    static Key backfillKey(KeyFactory keyFactory, String backfillId) {
        return ((KeyFactory)keyFactory.setKind(KIND_BACKFILL)).newKey(backfillId);
    }

    static Key globalConfigKey(KeyFactory keyFactory) {
        return ((KeyFactory)keyFactory.setKind(KIND_STYX_CONFIG)).newKey(KEY_GLOBAL_CONFIG);
    }

    void setEnabled(WorkflowId workflowId1, boolean enabled) throws IOException {
        this.patchState(workflowId1, WorkflowState.patchEnabled(enabled));
    }

    static Timestamp instantToTimestamp(Instant instant) {
        return Timestamp.of((Date)Date.from(instant));
    }

    private static Instant timestampToInstant(Timestamp ts) {
        return Instant.ofEpochSecond(ts.getSeconds(), ts.getNanos());
    }

    Optional<Resource> getResource(String id) throws IOException {
        Entity entity = this.datastore.get(((KeyFactory)this.datastore.newKeyFactory().setKind("CounterLimit")).newKey(id));
        if (entity == null) {
            return Optional.empty();
        }
        return Optional.of(this.entityToResource(entity));
    }

    void storeResource(Resource resource) throws IOException {
        this.storeWithRetries(() -> this.runInTransaction(transaction -> {
            transaction.store(resource);
            return null;
        }));
    }

    List<Resource> getResources() throws IOException {
        EntityQuery query = ((EntityQuery.Builder)Query.newEntityQueryBuilder().setKind("CounterLimit")).build();
        ArrayList resources = Lists.newArrayList();
        this.datastore.query(query, entity -> resources.add(this.entityToResource((Entity)entity)));
        return resources;
    }

    void deleteResource(String id) throws IOException {
        this.storeWithRetries(() -> {
            this.datastore.delete(((KeyFactory)this.datastore.newKeyFactory().setKind("CounterLimit")).newKey(id));
            return null;
        });
        this.deleteShardsForCounter(id);
    }

    private void deleteShardsForCounter(String counterId) throws IOException {
        ArrayList shards = new ArrayList();
        this.datastore.query(((EntityQuery.Builder)((EntityQuery.Builder)EntityQuery.newEntityQueryBuilder().setKind("CounterShard")).setFilter((StructuredQuery.Filter)StructuredQuery.PropertyFilter.eq((String)"counterId", (String)counterId))).build(), entity -> shards.add((Key)entity.getKey()));
        for (List batch : Lists.partition(shards, (int)500)) {
            this.storeWithRetries(() -> {
                this.datastore.delete(batch.toArray(new Key[0]));
                return null;
            });
        }
    }

    private Resource entityToResource(Entity entity) {
        return Resource.create(((Key)entity.getKey()).getName(), entity.getLong("limit"));
    }

    Optional<Backfill> getBackfill(String id) throws IOException {
        Entity entity = this.datastore.get(((KeyFactory)this.datastore.newKeyFactory().setKind(KIND_BACKFILL)).newKey(id));
        if (entity == null) {
            return Optional.empty();
        }
        return Optional.of(DatastoreStorage.entityToBackfill(entity));
    }

    private EntityQuery.Builder backfillQueryBuilder(boolean showAll, StructuredQuery.Filter ... filters) {
        EntityQuery.Builder queryBuilder = (EntityQuery.Builder)Query.newEntityQueryBuilder().setKind(KIND_BACKFILL);
        ArrayList andedFilters = Lists.newArrayList((Object[])filters);
        if (!showAll) {
            andedFilters.add(StructuredQuery.PropertyFilter.eq((String)PROPERTY_ALL_TRIGGERED, (boolean)false));
            andedFilters.add(StructuredQuery.PropertyFilter.eq((String)PROPERTY_HALTED, (boolean)false));
        }
        if (!andedFilters.isEmpty()) {
            StructuredQuery.Filter head = (StructuredQuery.Filter)andedFilters.get(0);
            StructuredQuery.Filter[] tail = (StructuredQuery.Filter[])andedFilters.stream().skip(1L).toArray(StructuredQuery.Filter[]::new);
            queryBuilder.setFilter((StructuredQuery.Filter)StructuredQuery.CompositeFilter.and((StructuredQuery.Filter)head, (StructuredQuery.Filter[])tail));
        }
        return queryBuilder;
    }

    private List<Backfill> backfillsForQuery(EntityQuery query) throws IOException {
        ArrayList backfills = Lists.newArrayList();
        this.datastore.query(query, entity -> backfills.add(DatastoreStorage.entityToBackfill(entity)));
        return backfills;
    }

    List<Backfill> getBackfills(boolean showAll) throws IOException {
        return this.backfillsForQuery(this.backfillQueryBuilder(showAll, new StructuredQuery.Filter[0]).build());
    }

    List<Backfill> getBackfillsForComponent(boolean showAll, String component) throws IOException {
        EntityQuery query = this.backfillQueryBuilder(showAll, new StructuredQuery.Filter[]{StructuredQuery.PropertyFilter.eq((String)PROPERTY_COMPONENT, (String)component)}).build();
        return this.backfillsForQuery(query);
    }

    List<Backfill> getBackfillsForWorkflow(boolean showAll, String workflow) throws IOException {
        EntityQuery query = this.backfillQueryBuilder(showAll, new StructuredQuery.Filter[]{StructuredQuery.PropertyFilter.eq((String)PROPERTY_WORKFLOW, (String)workflow)}).build();
        return this.backfillsForQuery(query);
    }

    List<Backfill> getBackfillsForWorkflowId(boolean showAll, WorkflowId workflowId) throws IOException {
        EntityQuery query = this.backfillQueryBuilder(showAll, new StructuredQuery.Filter[]{StructuredQuery.PropertyFilter.eq((String)PROPERTY_COMPONENT, (String)workflowId.componentId()), StructuredQuery.PropertyFilter.eq((String)PROPERTY_WORKFLOW, (String)workflowId.id())}).build();
        return this.backfillsForQuery(query);
    }

    static Backfill entityToBackfill(Entity entity) throws IOException {
        WorkflowId workflowId = WorkflowId.create(entity.getString(PROPERTY_COMPONENT), entity.getString(PROPERTY_WORKFLOW));
        BackfillBuilder builder = Backfill.newBuilder().id(((Key)entity.getKey()).getName()).start(DatastoreStorage.timestampToInstant(entity.getTimestamp(PROPERTY_START))).end(DatastoreStorage.timestampToInstant(entity.getTimestamp(PROPERTY_END))).workflowId(workflowId).concurrency((int)entity.getLong("concurrency")).nextTrigger(DatastoreStorage.timestampToInstant(entity.getTimestamp(PROPERTY_NEXT_TRIGGER))).schedule(Schedule.parse(entity.getString(PROPERTY_SCHEDULE))).allTriggered(entity.getBoolean(PROPERTY_ALL_TRIGGERED)).halted(entity.getBoolean(PROPERTY_HALTED)).reverse(DatastoreStorage.read(entity, PROPERTY_REVERSE, Boolean.FALSE));
        if (entity.contains(PROPERTY_DESCRIPTION)) {
            builder.description(entity.getString(PROPERTY_DESCRIPTION));
        }
        if (entity.contains("triggerParameters")) {
            builder.triggerParameters((TriggerParameters)Json.OBJECT_MAPPER.readValue(entity.getString("triggerParameters"), TriggerParameters.class));
        }
        return builder.build();
    }

    void storeBackfill(Backfill backfill) throws IOException {
        this.storeWithRetries(() -> this.runInTransaction(tx -> tx.store(backfill)));
    }

    private <T> Stream<T> readStream(Entity entity, String property) {
        return DatastoreStorage.read(entity, property, Collections.emptyList()).stream().map(Value::get);
    }

    static <T> Optional<T> readOpt(Entity entity, String property) {
        return entity.contains(property) ? Optional.of(entity.getValue(property).get()) : Optional.empty();
    }

    static <T> Optional<T> readOptJson(Entity entity, String property, Class<T> cls) throws IOException {
        return entity.contains(property) ? Optional.of(Json.OBJECT_MAPPER.readValue(entity.getString(property), cls)) : Optional.empty();
    }

    static <T> Optional<T> readOptJson(Entity entity, String property, TypeReference valueTypeRef) throws IOException {
        return entity.contains(property) ? Optional.of(Json.OBJECT_MAPPER.readValue(entity.getString(property), valueTypeRef)) : Optional.empty();
    }

    private static <T> T read(Entity entity, String property, T defaultValue) {
        return DatastoreStorage.readOpt(entity, property).orElse(defaultValue);
    }

    static StringValue jsonValue(Object o) throws JsonProcessingException {
        return ((StringValue.Builder)StringValue.newBuilder((String)Json.OBJECT_MAPPER.writeValueAsString(o)).setExcludeFromIndexes(true)).build();
    }

    public <T, E extends Exception> T runInTransaction(TransactionFunction<T, E> f) throws IOException, E {
        StorageTransaction tx = this.newTransaction();
        try {
            T value = f.apply(tx);
            tx.commit();
            T t = value;
            return t;
        }
        catch (DatastoreException e) {
            tx.rollback();
            throw new TransactionException(e);
        }
        finally {
            if (tx.isActive()) {
                tx.rollback();
            }
        }
    }

    private StorageTransaction newTransaction() throws TransactionException {
        CheckedDatastoreTransaction transaction;
        try {
            transaction = this.datastore.newTransaction();
        }
        catch (DatastoreIOException e) {
            throw new TransactionException(e.getCause());
        }
        return this.storageTransactionFactory.apply(transaction);
    }

    Map<Integer, Long> shardsForCounter(String counterId) throws IOException {
        List<Key> shardKeys = IntStream.range(0, 128).mapToObj(index -> ((KeyFactory)this.datastore.newKeyFactory().setKind("CounterShard")).newKey(String.format("%s-%d", counterId, index))).collect(Collectors.toList());
        HashMap<Integer, Long> fetchedShards = new HashMap<Integer, Long>();
        this.datastore.get(shardKeys, shard -> fetchedShards.put((int)shard.getLong("index"), shard.getLong("value")));
        return fetchedShards;
    }

    long getLimitForCounter(String counterId) throws IOException {
        if ("GLOBAL_STYX_CLUSTER".equals(counterId)) {
            return this.config().globalConcurrency().orElse(Long.MAX_VALUE);
        }
        Key limitKey = ((KeyFactory)this.datastore.newKeyFactory().setKind("CounterLimit")).newKey(counterId);
        Entity limitEntity = this.datastore.get(limitKey);
        if (limitEntity == null) {
            throw new IllegalArgumentException("No limit found in Datastore for " + counterId);
        }
        return limitEntity.getLong("limit");
    }

    private <T> CompletableFuture<T> asyncIO(IOOperation<T> f) {
        return f.executeAsync(this.executor);
    }
}

