package io.streamnative.oxia.client.notify;

import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.grpc.ChannelManager;
import io.streamnative.oxia.client.grpc.GrpcResponseStream;
import io.streamnative.oxia.client.metrics.NotificationMetrics;
import io.streamnative.oxia.proto.NotificationBatch;
import io.streamnative.oxia.proto.NotificationsRequest;
import io.streamnative.oxia.proto.ReactorOxiaClientGrpc;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

/* loaded from: input_file:io/streamnative/oxia/client/notify/ShardNotificationReceiver.class */
public class ShardNotificationReceiver extends GrpcResponseStream {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ShardNotificationReceiver.class);
    private final long shardId;

    @NonNull
    private final Consumer<Notification> callback;

    @NonNull
    private final NotificationMetrics metrics;

    @NonNull
    private Optional<Long> startingOffset;
    private Scheduler scheduler;
    private long offset;

    /* loaded from: input_file:io/streamnative/oxia/client/notify/ShardNotificationReceiver$Factory.class */
    static class Factory {

        @NonNull
        private final ChannelManager.StubFactory<ReactorOxiaClientGrpc.ReactorOxiaClientStub> reactorStubFactory;

        @NonNull
        private final Consumer<Notification> callback;

        /* JADX INFO: Access modifiers changed from: package-private */
        @NonNull
        public ShardNotificationReceiver newReceiver(long j, @NonNull String str, @NonNull NotificationMetrics notificationMetrics) {
            if (str == null) {
                throw new NullPointerException("leader is marked non-null but is null");
            }
            if (notificationMetrics == null) {
                throw new NullPointerException("metrics is marked non-null but is null");
            }
            return new ShardNotificationReceiver(() -> {
                return this.reactorStubFactory.apply(str);
            }, j, this.callback, notificationMetrics);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Factory(@NonNull ChannelManager.StubFactory<ReactorOxiaClientGrpc.ReactorOxiaClientStub> stubFactory, @NonNull Consumer<Notification> consumer) {
            if (stubFactory == null) {
                throw new NullPointerException("reactorStubFactory is marked non-null but is null");
            }
            if (consumer == null) {
                throw new NullPointerException("callback is marked non-null but is null");
            }
            this.reactorStubFactory = stubFactory;
            this.callback = consumer;
        }
    }

    ShardNotificationReceiver(@NonNull Supplier<ReactorOxiaClientGrpc.ReactorOxiaClientStub> supplier, long j, @NonNull Consumer<Notification> consumer, @NonNull NotificationMetrics notificationMetrics) {
        super(supplier);
        this.startingOffset = Optional.empty();
        if (supplier == null) {
            throw new NullPointerException("stubFactory is marked non-null but is null");
        }
        if (consumer == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        if (notificationMetrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        this.shardId = j;
        this.callback = consumer;
        this.metrics = notificationMetrics;
    }

    public void start(@NonNull Optional<Long> optional) {
        if (optional == null) {
            throw new NullPointerException("offset is marked non-null but is null");
        }
        if (optional.isPresent() && optional.get().longValue() < 0) {
            throw new IllegalArgumentException("Invalid offset: " + optional.get());
        }
        this.startingOffset = optional;
        start();
    }

    @Override // io.streamnative.oxia.client.grpc.GrpcResponseStream, java.lang.AutoCloseable
    public void close() {
        super.close();
        if (this.scheduler != null) {
            this.scheduler.dispose();
        }
    }

    @Override // io.streamnative.oxia.client.grpc.GrpcResponseStream
    @NonNull
    protected CompletableFuture<Void> start(@NonNull ReactorOxiaClientGrpc.ReactorOxiaClientStub reactorOxiaClientStub, @NonNull Consumer<Disposable> consumer) {
        if (reactorOxiaClientStub == null) {
            throw new NullPointerException("stub is marked non-null but is null");
        }
        if (consumer == null) {
            throw new NullPointerException("consumer is marked non-null but is null");
        }
        NotificationsRequest.Builder shardId = NotificationsRequest.newBuilder().setShardId(this.shardId);
        this.startingOffset.ifPresent(l -> {
            shardId.setStartOffsetExclusive(l.longValue());
        });
        RetryBackoffSpec doBeforeRetry = Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100L)).doBeforeRetry(retrySignal -> {
            log.warn("Retrying receiving notifications for shard {}: {}", Long.valueOf(this.shardId), retrySignal);
        });
        this.scheduler = Schedulers.newSingle(String.format("shard-%s-notifications", Long.valueOf(this.shardId)));
        Flux doOnError = Flux.defer(() -> {
            return reactorOxiaClientStub.getNotifications(shardId.build());
        }).doOnError(th -> {
            log.warn("Error receiving notifications for shard {}", Long.valueOf(this.shardId), th);
        });
        NotificationMetrics notificationMetrics = this.metrics;
        Objects.requireNonNull(notificationMetrics);
        consumer.accept(doOnError.doOnEach(notificationMetrics::recordBatch).retryWhen(doBeforeRetry).repeat().publishOn(this.scheduler).subscribe(this::notify));
        return CompletableFuture.completedFuture(null);
    }

    private void notify(@NonNull NotificationBatch notificationBatch) {
        if (notificationBatch == null) {
            throw new NullPointerException("batch is marked non-null but is null");
        }
        this.offset = Math.max(notificationBatch.getOffset(), this.offset);
        Stream filter = notificationBatch.getNotificationsMap().entrySet().stream().map(entry -> {
            String str = (String) entry.getKey();
            io.streamnative.oxia.proto.Notification notification = (io.streamnative.oxia.proto.Notification) entry.getValue();
            switch (notification.getType()) {
                case KEY_CREATED:
                    return new Notification.KeyCreated(str, notification.getVersionId());
                case KEY_MODIFIED:
                    return new Notification.KeyModified(str, notification.getVersionId());
                case KEY_DELETED:
                    return new Notification.KeyDeleted(str);
                default:
                    return null;
            }
        }).filter(obj -> {
            return Objects.nonNull(obj);
        });
        Consumer<Notification> consumer = this.callback;
        Objects.requireNonNull(consumer);
        filter.forEach(obj2 -> {
            consumer.accept(obj2);
        });
    }

    public long getOffset() {
        return this.offset;
    }

    long getShardId() {
        return this.shardId;
    }
}
