package io.basestar.stream;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import io.basestar.auth.Caller;
import io.basestar.database.Database;
import io.basestar.database.event.ObjectCreatedEvent;
import io.basestar.database.event.ObjectDeletedEvent;
import io.basestar.database.event.ObjectUpdatedEvent;
import io.basestar.database.options.ReadOptions;
import io.basestar.event.Emitter;
import io.basestar.event.Event;
import io.basestar.event.Handler;
import io.basestar.event.Handlers;
import io.basestar.expression.Context;
import io.basestar.expression.Expression;
import io.basestar.schema.Index;
import io.basestar.schema.Instance;
import io.basestar.schema.Namespace;
import io.basestar.schema.ObjectSchema;
import io.basestar.storage.PartitionedStorage;
import io.basestar.storage.exception.UnsupportedQueryException;
import io.basestar.storage.query.DisjunctionVisitor;
import io.basestar.storage.query.RangeVisitor;
import io.basestar.stream.Change;
import io.basestar.stream.Subscription;
import io.basestar.stream.event.SubscriptionPublishEvent;
import io.basestar.stream.event.SubscriptionQueryEvent;
import io.basestar.util.Name;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/* loaded from: input_file:io/basestar/stream/DefaultNexus.class */
public class DefaultNexus implements Nexus, Handler<Event> {
    private static final int SUBSCRIPTION_PAGE_SIZE = 50;
    private static final Handlers<DefaultNexus> HANDLERS = Handlers.builder().on(ObjectCreatedEvent.class, (v0, v1) -> {
        return v0.onObjectCreated(v1);
    }).on(ObjectUpdatedEvent.class, (v0, v1) -> {
        return v0.onObjectUpdated(v1);
    }).on(ObjectDeletedEvent.class, (v0, v1) -> {
        return v0.onObjectDeleted(v1);
    }).on(SubscriptionQueryEvent.class, (v0, v1) -> {
        return v0.onSubscriptionQuery(v1);
    }).on(SubscriptionPublishEvent.class, (v0, v1) -> {
        return v0.onSubscriptionPublish(v1);
    }).build();
    private final Subscriber subscriber;
    private final Publisher publisher;
    private final Database database;
    private final Namespace namespace;
    private final Emitter emitter;

    /* loaded from: input_file:io/basestar/stream/DefaultNexus$Builder.class */
    public static class Builder {
        private Subscriber subscriber;
        private Publisher publisher;
        private Database database;
        private Namespace namespace;
        private Emitter emitter;

        Builder() {
        }

        public Builder subscriber(Subscriber subscriber) {
            this.subscriber = subscriber;
            return this;
        }

        public Builder publisher(Publisher publisher) {
            this.publisher = publisher;
            return this;
        }

        public Builder database(Database database) {
            this.database = database;
            return this;
        }

        public Builder namespace(Namespace namespace) {
            this.namespace = namespace;
            return this;
        }

        public Builder emitter(Emitter emitter) {
            this.emitter = emitter;
            return this;
        }

        public DefaultNexus build() {
            return new DefaultNexus(this.subscriber, this.publisher, this.database, this.namespace, this.emitter);
        }

        public String toString() {
            return "DefaultNexus.Builder(subscriber=" + this.subscriber + ", publisher=" + this.publisher + ", database=" + this.database + ", namespace=" + this.namespace + ", emitter=" + this.emitter + ")";
        }
    }

    DefaultNexus(Subscriber subscriber, Publisher publisher, Database database, Namespace namespace, Emitter emitter) {
        this.subscriber = subscriber;
        this.publisher = publisher;
        this.database = database;
        this.namespace = namespace;
        this.emitter = emitter;
    }

    public CompletableFuture<?> handle(Event event, Map<String, String> map) {
        return HANDLERS.handle(this, event, map);
    }

    @Override // io.basestar.stream.Nexus
    public CompletableFuture<?> subscribe(Caller caller, String str, String str2, String str3, Expression expression, Set<Name> set) {
        ObjectSchema requireObjectSchema = this.namespace.requireObjectSchema(str3);
        return this.subscriber.create(caller, str, str2, expression.bind(Context.init()), keys(requireObjectSchema, expression), set);
    }

    @Override // io.basestar.stream.Nexus
    public CompletableFuture<?> unsubscribe(Caller caller, String str, String str2) {
        throw new UnsupportedOperationException();
    }

    @Override // io.basestar.stream.Nexus
    public CompletableFuture<?> unsubscribeAll(Caller caller, String str) {
        throw new UnsupportedOperationException();
    }

    private CompletableFuture<?> onObjectCreated(ObjectCreatedEvent objectCreatedEvent) {
        ObjectSchema requireObjectSchema = this.namespace.requireObjectSchema(objectCreatedEvent.getSchema());
        Map after = objectCreatedEvent.getAfter();
        return this.emitter.emit(SubscriptionQueryEvent.of(requireObjectSchema.getQualifiedName(), objectCreatedEvent.getId(), Change.Event.CREATE, null, Instance.getVersion(after), keys(requireObjectSchema, (Map<String, Object>) after)));
    }

    private CompletableFuture<?> onObjectUpdated(ObjectUpdatedEvent objectUpdatedEvent) {
        ObjectSchema requireObjectSchema = this.namespace.requireObjectSchema(objectUpdatedEvent.getSchema());
        Map before = objectUpdatedEvent.getBefore();
        Map after = objectUpdatedEvent.getAfter();
        return this.emitter.emit(SubscriptionQueryEvent.of(requireObjectSchema.getQualifiedName(), objectUpdatedEvent.getId(), Change.Event.UPDATE, Instance.getVersion(before), Instance.getVersion(after), Sets.union(keys(requireObjectSchema, (Map<String, Object>) after), keys(requireObjectSchema, (Map<String, Object>) before))));
    }

    private CompletableFuture<?> onObjectDeleted(ObjectDeletedEvent objectDeletedEvent) {
        ObjectSchema requireObjectSchema = this.namespace.requireObjectSchema(objectDeletedEvent.getSchema());
        Map before = objectDeletedEvent.getBefore();
        return this.emitter.emit(SubscriptionQueryEvent.of(requireObjectSchema.getQualifiedName(), objectDeletedEvent.getId(), Change.Event.DELETE, Instance.getVersion(before), null, keys(requireObjectSchema, (Map<String, Object>) before)));
    }

    private CompletableFuture<?> onSubscriptionQuery(SubscriptionQueryEvent subscriptionQueryEvent) {
        ObjectSchema requireObjectSchema = this.namespace.requireObjectSchema(subscriptionQueryEvent.getSchema());
        return this.subscriber.listByKeys(subscriptionQueryEvent.getKeys(), subscriptionQueryEvent.getPaging()).page(SUBSCRIPTION_PAGE_SIZE).thenCompose(pagedList -> {
            ArrayList arrayList = new ArrayList();
            pagedList.forEach(subscription -> {
                arrayList.add(SubscriptionPublishEvent.of(requireObjectSchema.getQualifiedName(), subscriptionQueryEvent.getId(), subscriptionQueryEvent.getEvent(), subscriptionQueryEvent.getBefore(), subscriptionQueryEvent.getAfter(), subscription));
            });
            if (pagedList.hasPaging()) {
                arrayList.add(subscriptionQueryEvent.withPaging(pagedList.getPaging()));
            }
            return this.emitter.emit(arrayList);
        });
    }

    private CompletableFuture<?> onSubscriptionPublish(SubscriptionPublishEvent subscriptionPublishEvent) {
        ObjectSchema requireObjectSchema = this.namespace.requireObjectSchema(subscriptionPublishEvent.getSchema());
        String id = subscriptionPublishEvent.getId();
        Subscription subscription = subscriptionPublishEvent.getSubscription();
        Caller caller = subscription.getCaller();
        Expression expression = subscription.getExpression();
        Set<Name> expand = subscription.getExpand();
        CompletableFuture<Instance> load = load(caller, requireObjectSchema, id, subscriptionPublishEvent.getBefore(), expand);
        CompletableFuture<Instance> load2 = load(caller, requireObjectSchema, id, subscriptionPublishEvent.getAfter(), expand);
        return CompletableFuture.allOf(load, load2).thenCompose(r17 -> {
            Instance instance = (Instance) load.getNow(null);
            Instance instance2 = (Instance) load2.getNow(null);
            return (match(instance, expression) || match(instance2, expression)) ? this.publisher.publish(subscription.getSub(), subscription.getChannel(), Change.of(subscriptionPublishEvent.getEvent(), requireObjectSchema.getQualifiedName(), id, instance, instance2)) : CompletableFuture.completedFuture(null);
        });
    }

    private boolean match(Map<String, Object> map, Expression expression) {
        if (map != null) {
            return expression.evaluatePredicate(Context.init(map));
        }
        return false;
    }

    private CompletableFuture<Instance> load(Caller caller, ObjectSchema objectSchema, String str, Long l, Set<Name> set) {
        return l != null ? this.database.read(caller, ReadOptions.builder().schema(objectSchema.getQualifiedName()).id(str).version(l).expand(set).build()) : CompletableFuture.completedFuture(null);
    }

    private static Subscription.Key idKey(ObjectSchema objectSchema, String str) {
        return new Subscription.Key(objectSchema.getQualifiedName(), "__id", ImmutableList.of(str));
    }

    private static Set<Subscription.Key> keys(ObjectSchema objectSchema, Expression expression) {
        Set set = (Set) expression.visit(new DisjunctionVisitor());
        HashSet hashSet = new HashSet();
        Iterator it = set.iterator();
        while (it.hasNext()) {
            Map map = (Map) ((Expression) it.next()).visit(new RangeVisitor());
            Optional constantId = PartitionedStorage.constantId(map);
            if (constantId.isPresent()) {
                hashSet.add(idKey(objectSchema, (String) constantId.get()));
            } else {
                Optional satisfy = PartitionedStorage.satisfy(objectSchema.getIndexes().values(), map, Collections.emptyList());
                if (!satisfy.isPresent()) {
                    throw new UnsupportedQueryException(objectSchema.getQualifiedName(), expression, "no index");
                }
                PartitionedStorage.SatisfyResult satisfyResult = (PartitionedStorage.SatisfyResult) satisfy.get();
                hashSet.add(new Subscription.Key(objectSchema.getQualifiedName(), satisfyResult.getIndex().getName(), satisfyResult.getPartition()));
            }
        }
        return hashSet;
    }

    private static Set<Subscription.Key> keys(ObjectSchema objectSchema, Map<String, Object> map) {
        HashSet hashSet = new HashSet();
        hashSet.add(idKey(objectSchema, Instance.getId(map)));
        for (Index index : objectSchema.getIndexes().values()) {
            index.readValues(map).forEach((key, map2) -> {
                hashSet.add(new Subscription.Key(objectSchema.getQualifiedName(), index.getName(), key.getPartition()));
            });
        }
        return hashSet;
    }

    public static Builder builder() {
        return new Builder();
    }
}
