package io.basestar.stream;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.basestar.auth.Caller;
import io.basestar.expression.Context;
import io.basestar.expression.Expression;
import io.basestar.expression.constant.Constant;
import io.basestar.expression.constant.NameConstant;
import io.basestar.expression.function.In;
import io.basestar.expression.logical.Or;
import io.basestar.schema.Consistency;
import io.basestar.schema.Index;
import io.basestar.schema.Instance;
import io.basestar.schema.Namespace;
import io.basestar.schema.ObjectSchema;
import io.basestar.schema.Property;
import io.basestar.schema.use.UseArray;
import io.basestar.schema.use.UseBinary;
import io.basestar.schema.use.UseString;
import io.basestar.storage.Storage;
import io.basestar.storage.Versioning;
import io.basestar.stream.Subscription;
import io.basestar.util.Name;
import io.basestar.util.Page;
import io.basestar.util.Pager;
import io.basestar.util.Sort;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/* loaded from: input_file:io/basestar/stream/StorageSubscriptions.class */
public class StorageSubscriptions implements Subscriptions {
    private static final int UNSUBSCRIBE_PAGE_SIZE = 50;
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final String SCHEMA_NAME = "Subscription";
    private static final Namespace NAMESPACE = Namespace.builder().setSchema(Name.of(new String[]{SCHEMA_NAME}), ObjectSchema.builder().setProperty("sub", Property.builder().setType(UseString.DEFAULT)).setProperty("channel", Property.builder().setType(UseString.DEFAULT)).setProperty("caller", Property.builder().setType(UseString.DEFAULT)).setProperty("expression", Property.builder().setType(UseString.DEFAULT)).setProperty("info", Property.builder().setType(UseString.DEFAULT)).setProperty("keys", Property.builder().setType(new UseArray(UseBinary.DEFAULT))).setIndex("sub", Index.builder().setConsistency(Consistency.ATOMIC).setPartition(ImmutableList.of(Name.of(new String[]{"sub"}))).setSort(ImmutableList.of(Sort.asc(Name.of(new String[]{"channel"}))))).setIndex("keys", Index.builder().setConsistency(Consistency.ATOMIC).setOver(ImmutableMap.of("key", Name.of(new String[]{"keys"}))).setPartition(ImmutableList.of(Name.of(new String[]{"key"}))))).build();
    private static final ObjectSchema SCHEMA = NAMESPACE.requireObjectSchema(SCHEMA_NAME);
    private final Storage storage;

    public StorageSubscriptions(Storage storage) {
        this.storage = storage;
        storage.validate(SCHEMA);
    }

    @Override // io.basestar.stream.Subscriptions
    public CompletableFuture<?> subscribe(Caller caller, String str, String str2, Set<Subscription.Key> set, Expression expression, SubscriptionInfo subscriptionInfo) {
        try {
            Instant now = Instant.now();
            HashMap hashMap = new HashMap();
            String id = id(str, str2);
            Instance.setSchema(hashMap, Name.of(new String[]{SCHEMA_NAME}));
            Instance.setId(hashMap, id);
            Instance.setCreated(hashMap, now);
            Instance.setUpdated(hashMap, now);
            Instance.setVersion(hashMap, 1L);
            hashMap.put("sub", str);
            hashMap.put("channel", str2);
            hashMap.put("caller", OBJECT_MAPPER.writeValueAsString(caller));
            hashMap.put("expression", expression.toString());
            hashMap.put("info", subscriptionInfo == null ? null : OBJECT_MAPPER.writeValueAsString(subscriptionInfo));
            hashMap.put("keys", set.stream().map(StorageSubscriptions::binaryKey).collect(Collectors.toList()));
            return this.storage.write(Consistency.ATOMIC, Versioning.CHECKED).createObject(SCHEMA, id, (Instance) SCHEMA.create(hashMap)).write();
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    private static byte[] binaryKey(Subscription.Key key) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(key.getSchema().toString());
        arrayList.add(key.getIndex());
        arrayList.addAll(key.getPartition());
        return UseBinary.binaryKey(arrayList);
    }

    private static Expression keyExpression(Subscription.Key key) {
        return new In(new Constant(binaryKey(key)), new NameConstant(Name.of(new String[]{"keys"})));
    }

    private static String id(String str, String str2) {
        return str + "_" + str2;
    }

    private static List<Sort> sort() {
        return ImmutableList.of(Sort.asc(ObjectSchema.ID_NAME));
    }

    @Override // io.basestar.stream.Subscriptions
    public List<Pager.Source<Subscription>> query(Set<Subscription.Key> set) {
        return Pager.map(this.storage.query(SCHEMA, new Or((Expression[]) set.stream().map(StorageSubscriptions::keyExpression).toArray(i -> {
            return new Expression[i];
        })), sort(), Collections.emptySet()), this::fromMap);
    }

    private Subscription fromMap(Map<String, Object> map) {
        try {
            Subscription subscription = new Subscription();
            subscription.setSub((String) map.get("sub"));
            subscription.setChannel((String) map.get("channel"));
            subscription.setExpression(Expression.parse((String) map.get("expression")));
            subscription.setCaller((Caller) OBJECT_MAPPER.readValue((String) map.get("caller"), Caller.class));
            String str = (String) map.get("info");
            if (str != null) {
                subscription.setInfo((SubscriptionInfo) OBJECT_MAPPER.readValue(str, SubscriptionInfo.class));
            }
            return subscription;
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    @Override // io.basestar.stream.Subscriptions
    public CompletableFuture<?> unsubscribe(String str, String str2) {
        return this.storage.readObject(SCHEMA, id(str, str2), Collections.emptySet()).thenCompose(map -> {
            Storage.WriteTransaction write = this.storage.write(Consistency.ATOMIC, Versioning.CHECKED);
            write.deleteObject(SCHEMA, id(str, str2), map);
            return write.write();
        });
    }

    @Override // io.basestar.stream.Subscriptions
    public CompletableFuture<?> unsubscribeAll(String str) {
        return unsubscribeAll(this.storage.query(SCHEMA, Expression.parseAndBind(Context.init(ImmutableMap.of("s", str)), "sub == s"), sort(), Collections.emptySet()), null);
    }

    private CompletableFuture<?> unsubscribeAll(List<Pager.Source<Map<String, Object>>> list, Page.Token token) {
        return new Pager(Instance.comparator(sort()), list, token).page(UNSUBSCRIBE_PAGE_SIZE).thenCompose(page -> {
            Storage.WriteTransaction write = this.storage.write(Consistency.NONE, Versioning.CHECKED);
            page.forEach(map -> {
                write.deleteObject(SCHEMA, Instance.getId(map), map);
            });
            return page.hasMore() ? write.write().thenCompose(batchResponse -> {
                return unsubscribeAll(list, page.getPaging());
            }).thenApply(obj -> {
                return null;
            }) : write.write();
        });
    }
}
