package io.streamnative.oxia.client.shard;

import io.streamnative.oxia.client.CompositeConsumer;
import io.streamnative.oxia.client.grpc.CustomStatusCode;
import io.streamnative.oxia.client.grpc.GrpcResponseStream;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.metrics.ShardAssignmentMetrics;
import io.streamnative.oxia.client.metrics.api.Metrics;
import io.streamnative.oxia.proto.NamespaceShardsAssignment;
import io.streamnative.oxia.proto.ShardAssignments;
import io.streamnative.oxia.proto.ShardAssignmentsRequest;
import io.streamnative.oxia.shaded.com.google.common.annotations.VisibleForTesting;
import io.streamnative.oxia.shaded.com.google.common.base.Strings;
import io.streamnative.oxia.shaded.io.grpc.Status;
import io.streamnative.oxia.shaded.io.grpc.StatusRuntimeException;
import io.streamnative.oxia.shaded.reactor.core.Disposable;
import io.streamnative.oxia.shaded.reactor.core.publisher.ConnectableFlux;
import io.streamnative.oxia.shaded.reactor.core.publisher.Flux;
import io.streamnative.oxia.shaded.reactor.core.publisher.Mono;
import io.streamnative.oxia.shaded.reactor.core.scheduler.Scheduler;
import io.streamnative.oxia.shaded.reactor.core.scheduler.Schedulers;
import io.streamnative.oxia.shaded.reactor.util.retry.Retry;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/oxia/client/shard/ShardManager.class */
public class ShardManager extends GrpcResponseStream implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ShardManager.class);

    @NonNull
    private final Assignments assignments;

    @NonNull
    private final CompositeConsumer<ShardAssignmentChanges> callbacks;

    @NonNull
    private final ShardAssignmentMetrics metrics;
    private final Scheduler scheduler;

    /* loaded from: input_file:io/streamnative/oxia/client/shard/ShardManager$Assignments.class */
    public static class Assignments {
        private final Lock rLock;
        private final Lock wLock;
        private Map<Long, Shard> shards;
        private final ShardStrategy shardStrategy;
        private final String namespace;

        Assignments(ShardStrategy shardStrategy, String str) {
            this(new ReentrantReadWriteLock(), shardStrategy, str);
        }

        Assignments(ReadWriteLock readWriteLock, ShardStrategy shardStrategy, String str) {
            this.shards = new HashMap();
            if (Strings.isNullOrEmpty(str)) {
                throw new IllegalArgumentException("namespace must not be null or empty");
            }
            this.shardStrategy = shardStrategy;
            this.namespace = str;
            this.rLock = readWriteLock.readLock();
            this.wLock = readWriteLock.writeLock();
        }

        public long get(String str) {
            try {
                this.rLock.lock();
                long longValue = ((Long) this.shards.values().stream().filter(this.shardStrategy.acceptsKeyPredicate(str)).findAny().map((v0) -> {
                    return v0.id();
                }).orElseThrow(() -> {
                    return new NoShardAvailableException(str);
                })).longValue();
                this.rLock.unlock();
                return longValue;
            } catch (Throwable th) {
                this.rLock.unlock();
                throw th;
            }
        }

        public List<Long> getAll() {
            try {
                this.rLock.lock();
                return this.shards.keySet().stream().toList();
            } finally {
                this.rLock.unlock();
            }
        }

        public String leader(long j) {
            try {
                this.rLock.lock();
                String str = (String) Optional.ofNullable(this.shards.get(Long.valueOf(j))).map((v0) -> {
                    return v0.leader();
                }).orElseThrow(() -> {
                    return new NoShardAvailableException(j);
                });
                this.rLock.unlock();
                return str;
            } catch (Throwable th) {
                this.rLock.unlock();
                throw th;
            }
        }

        void update(List<Shard> list) {
            try {
                this.wLock.lock();
                this.shards = ShardManager.recomputeShardHashBoundaries(this.shards, list);
            } finally {
                this.wLock.unlock();
            }
        }
    }

    /* loaded from: input_file:io/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange.class */
    public interface ShardAssignmentChange {

        /* loaded from: input_file:io/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Added.class */
        public static final class Added extends Record implements ShardAssignmentChange {
            private final long shardId;

            @NonNull
            private final String leader;

            public Added(long j, @NonNull String str) {
                if (str == null) {
                    throw new NullPointerException("leader is marked non-null but is null");
                }
                this.shardId = j;
                this.leader = str;
            }

            @Override // java.lang.Record
            public final String toString() {
                return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Added.class), Added.class, "shardId;leader", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Added;->shardId:J", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Added;->leader:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
            }

            @Override // java.lang.Record
            public final int hashCode() {
                return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Added.class), Added.class, "shardId;leader", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Added;->shardId:J", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Added;->leader:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
            }

            @Override // java.lang.Record
            public final boolean equals(Object obj) {
                return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Added.class, Object.class), Added.class, "shardId;leader", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Added;->shardId:J", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Added;->leader:Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
            }

            public long shardId() {
                return this.shardId;
            }

            @NonNull
            public String leader() {
                return this.leader;
            }
        }

        /* loaded from: input_file:io/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Reassigned.class */
        public static final class Reassigned extends Record implements ShardAssignmentChange {
            private final long shardId;

            @NonNull
            private final String fromLeader;

            @NonNull
            private final String toLeader;

            public Reassigned(long j, @NonNull String str, @NonNull String str2) {
                if (str == null) {
                    throw new NullPointerException("fromLeader is marked non-null but is null");
                }
                if (str2 == null) {
                    throw new NullPointerException("toLeader is marked non-null but is null");
                }
                this.shardId = j;
                this.fromLeader = str;
                this.toLeader = str2;
            }

            @Override // java.lang.Record
            public final String toString() {
                return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Reassigned.class), Reassigned.class, "shardId;fromLeader;toLeader", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Reassigned;->shardId:J", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Reassigned;->fromLeader:Ljava/lang/String;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Reassigned;->toLeader:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
            }

            @Override // java.lang.Record
            public final int hashCode() {
                return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Reassigned.class), Reassigned.class, "shardId;fromLeader;toLeader", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Reassigned;->shardId:J", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Reassigned;->fromLeader:Ljava/lang/String;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Reassigned;->toLeader:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
            }

            @Override // java.lang.Record
            public final boolean equals(Object obj) {
                return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Reassigned.class, Object.class), Reassigned.class, "shardId;fromLeader;toLeader", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Reassigned;->shardId:J", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Reassigned;->fromLeader:Ljava/lang/String;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Reassigned;->toLeader:Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
            }

            public long shardId() {
                return this.shardId;
            }

            @NonNull
            public String fromLeader() {
                return this.fromLeader;
            }

            @NonNull
            public String toLeader() {
                return this.toLeader;
            }
        }

        /* loaded from: input_file:io/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Removed.class */
        public static final class Removed extends Record implements ShardAssignmentChange {
            private final long shardId;

            @NonNull
            private final String leader;

            public Removed(long j, @NonNull String str) {
                if (str == null) {
                    throw new NullPointerException("leader is marked non-null but is null");
                }
                this.shardId = j;
                this.leader = str;
            }

            @Override // java.lang.Record
            public final String toString() {
                return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Removed.class), Removed.class, "shardId;leader", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Removed;->shardId:J", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Removed;->leader:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
            }

            @Override // java.lang.Record
            public final int hashCode() {
                return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Removed.class), Removed.class, "shardId;leader", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Removed;->shardId:J", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Removed;->leader:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
            }

            @Override // java.lang.Record
            public final boolean equals(Object obj) {
                return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Removed.class, Object.class), Removed.class, "shardId;leader", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Removed;->shardId:J", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChange$Removed;->leader:Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
            }

            public long shardId() {
                return this.shardId;
            }

            @NonNull
            public String leader() {
                return this.leader;
            }
        }
    }

    /* loaded from: input_file:io/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges.class */
    public static final class ShardAssignmentChanges extends Record {
        private final Set<ShardAssignmentChange.Added> added;
        private final Set<ShardAssignmentChange.Removed> removed;
        private final Set<ShardAssignmentChange.Reassigned> reassigned;

        public ShardAssignmentChanges(Set<ShardAssignmentChange.Added> set, Set<ShardAssignmentChange.Removed> set2, Set<ShardAssignmentChange.Reassigned> set3) {
            this.added = set;
            this.removed = set2;
            this.reassigned = set3;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ShardAssignmentChanges.class), ShardAssignmentChanges.class, "added;removed;reassigned", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->added:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->removed:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->reassigned:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ShardAssignmentChanges.class), ShardAssignmentChanges.class, "added;removed;reassigned", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->added:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->removed:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->reassigned:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ShardAssignmentChanges.class, Object.class), ShardAssignmentChanges.class, "added;removed;reassigned", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->added:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->removed:Ljava/util/Set;", "FIELD:Lio/streamnative/oxia/client/shard/ShardManager$ShardAssignmentChanges;->reassigned:Ljava/util/Set;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Set<ShardAssignmentChange.Added> added() {
            return this.added;
        }

        public Set<ShardAssignmentChange.Removed> removed() {
            return this.removed;
        }

        public Set<ShardAssignmentChange.Reassigned> reassigned() {
            return this.reassigned;
        }
    }

    @VisibleForTesting
    ShardManager(@NonNull OxiaStub oxiaStub, @NonNull Assignments assignments, @NonNull CompositeConsumer<ShardAssignmentChanges> compositeConsumer, @NonNull ShardAssignmentMetrics shardAssignmentMetrics) {
        super(oxiaStub);
        if (oxiaStub == null) {
            throw new NullPointerException("stub is marked non-null but is null");
        }
        if (assignments == null) {
            throw new NullPointerException("assignments is marked non-null but is null");
        }
        if (compositeConsumer == null) {
            throw new NullPointerException("callbacks is marked non-null but is null");
        }
        if (shardAssignmentMetrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        this.assignments = assignments;
        this.callbacks = compositeConsumer;
        this.metrics = shardAssignmentMetrics;
        this.scheduler = Schedulers.newSingle("shard-assignments");
    }

    public ShardManager(@NonNull OxiaStub oxiaStub, @NonNull Metrics metrics, @NonNull String str) {
        this(oxiaStub, new Assignments(HashRangeShardStrategy.Xxh332HashRangeShardStrategy, str), new CompositeConsumer(), ShardAssignmentMetrics.create(metrics));
        if (oxiaStub == null) {
            throw new NullPointerException("stub is marked non-null but is null");
        }
        if (metrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("namespace is marked non-null but is null");
        }
    }

    @Override // io.streamnative.oxia.client.grpc.GrpcResponseStream, java.lang.AutoCloseable
    public void close() {
        super.close();
        this.scheduler.dispose();
    }

    @Override // io.streamnative.oxia.client.grpc.GrpcResponseStream
    protected CompletableFuture<Void> start(OxiaStub oxiaStub, Consumer<Disposable> consumer) {
        Flux doOnNext = Flux.defer(() -> {
            return oxiaStub.reactor().getShardAssignments(ShardAssignmentsRequest.newBuilder().setNamespace(this.assignments.namespace).build());
        }).doOnError(this::processError).retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100L)).filter(this::isErrorRetryable).doBeforeRetry(retrySignal -> {
            log.warn("Retrying receiving shard assignments: {}", retrySignal);
        })).repeat().publishOn(this.scheduler).doOnNext(this::updateAssignments);
        ShardAssignmentMetrics shardAssignmentMetrics = this.metrics;
        Objects.requireNonNull(shardAssignmentMetrics);
        ConnectableFlux publish = doOnNext.doOnEach(shardAssignmentMetrics::recordAssignments).publish();
        CompletableFuture<Void> future = Mono.from(publish).then().toFuture();
        consumer.accept(publish.connect());
        return future;
    }

    private void processError(@NonNull Throwable th) {
        String description;
        if (th == null) {
            throw new NullPointerException("error is marked non-null but is null");
        }
        if (th instanceof StatusRuntimeException) {
            Status status = ((StatusRuntimeException) th).getStatus();
            if (status.getCode() == Status.Code.UNKNOWN && (description = status.getDescription()) != null && CustomStatusCode.fromDescription(description) == CustomStatusCode.ErrorNamespaceNotFound) {
                NamespaceNotFoundException namespaceNotFoundException = new NamespaceNotFoundException(this.assignments.namespace);
                log.error("Failed receiving shard assignments", (Throwable) namespaceNotFoundException);
                throw namespaceNotFoundException;
            }
        }
        log.warn("Failed receiving shard assignments", th);
    }

    private void updateAssignments(ShardAssignments shardAssignments) {
        NamespaceShardsAssignment namespaceShardsAssignment = shardAssignments.getNamespacesMap().get(this.assignments.namespace);
        if (namespaceShardsAssignment == null) {
            throw new NamespaceNotFoundException(this.assignments.namespace, true);
        }
        List<Shard> list = (List) namespaceShardsAssignment.getAssignmentsList().stream().map(Shard::fromProto).collect(Collectors.toList());
        ShardAssignmentChanges computeShardLeaderChanges = computeShardLeaderChanges(this.assignments.shards, recomputeShardHashBoundaries(this.assignments.shards, list));
        this.assignments.update(list);
        this.callbacks.accept(computeShardLeaderChanges);
        this.metrics.recordChanges(computeShardLeaderChanges);
    }

    @VisibleForTesting
    static Map<Long, Shard> recomputeShardHashBoundaries(Map<Long, Shard> map, List<Shard> list) {
        ArrayList arrayList = new ArrayList();
        list.forEach(shard -> {
            shard.findOverlapping(map.values()).forEach(shard -> {
                log.info("Deleting shard {} as it overlaps with {}", shard, shard);
                arrayList.add(Long.valueOf(shard.id()));
            });
        });
        return Collections.unmodifiableMap((Map) Stream.concat(map.entrySet().stream().filter(entry -> {
            return !arrayList.contains(entry.getKey());
        }).map((v0) -> {
            return v0.getValue();
        }), list.stream()).collect(Collectors.toMap((v0) -> {
            return v0.id();
        }, Function.identity())));
    }

    @VisibleForTesting
    static ShardAssignmentChanges computeShardLeaderChanges(Map<Long, Shard> map, Map<Long, Shard> map2) {
        Set set = (Set) map.entrySet().stream().filter(entry -> {
            return !map2.containsKey(entry.getKey());
        }).map(entry2 -> {
            return new ShardAssignmentChange.Removed(((Long) entry2.getKey()).longValue(), ((Shard) entry2.getValue()).leader());
        }).collect(Collectors.toSet());
        return new ShardAssignmentChanges(Collections.unmodifiableSet((Set) map2.entrySet().stream().filter(entry3 -> {
            return !map.containsKey(entry3.getKey());
        }).map(entry4 -> {
            return new ShardAssignmentChange.Added(((Long) entry4.getKey()).longValue(), ((Shard) entry4.getValue()).leader());
        }).collect(Collectors.toSet())), Collections.unmodifiableSet(set), Collections.unmodifiableSet((Set) map.entrySet().stream().filter(entry5 -> {
            return map2.containsKey(entry5.getKey());
        }).filter(entry6 -> {
            return !((Shard) map2.get(entry6.getKey())).leader().equals(((Shard) entry6.getValue()).leader());
        }).map(entry7 -> {
            Long l = (Long) entry7.getKey();
            return new ShardAssignmentChange.Reassigned(l.longValue(), ((Shard) entry7.getValue()).leader(), ((Shard) map2.get(entry7.getKey())).leader());
        }).collect(Collectors.toSet())));
    }

    public long get(String str) {
        return this.assignments.get(str);
    }

    public List<Long> getAll() {
        return this.assignments.getAll();
    }

    public String leader(long j) {
        return this.assignments.leader(j);
    }

    public void addCallback(@NonNull Consumer<ShardAssignmentChanges> consumer) {
        if (consumer == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        this.callbacks.add(consumer);
    }

    private boolean isErrorRetryable(@NonNull Throwable th) {
        if (th == null) {
            throw new NullPointerException("ex is marked non-null but is null");
        }
        if (th instanceof NamespaceNotFoundException) {
            return ((NamespaceNotFoundException) th).isRetryable();
        }
        return true;
    }
}
