package io.streamnative.oxia.client.notify;

import io.streamnative.oxia.client.CompositeConsumer;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.grpc.OxiaStubManager;
import io.streamnative.oxia.client.metrics.NotificationMetrics;
import io.streamnative.oxia.client.metrics.api.Metrics;
import io.streamnative.oxia.client.notify.ShardNotificationReceiver;
import io.streamnative.oxia.client.shard.ShardManager;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/oxia/client/notify/NotificationManager.class */
public class NotificationManager implements AutoCloseable, Consumer<ShardManager.ShardAssignmentChanges> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) NotificationManager.class);

    @NonNull
    private final ShardNotificationReceiver.Factory recieverFactory;

    @NonNull
    private final ShardManager shardManager;

    @NonNull
    private final CompositeConsumer<Notification> compositeCallback;

    @NonNull
    private final NotificationMetrics metrics;
    private final ConcurrentMap<Long, ShardNotificationReceiver> shardReceivers = new ConcurrentHashMap();
    private volatile boolean started = false;
    private volatile boolean closed = false;

    public NotificationManager(@NonNull OxiaStubManager oxiaStubManager, @NonNull ShardManager shardManager, @NonNull Metrics metrics) {
        if (oxiaStubManager == null) {
            throw new NullPointerException("stubManager is marked non-null but is null");
        }
        if (shardManager == null) {
            throw new NullPointerException("shardManager is marked non-null but is null");
        }
        if (metrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        this.compositeCallback = new CompositeConsumer<>();
        this.recieverFactory = new ShardNotificationReceiver.Factory(oxiaStubManager, this.compositeCallback);
        this.shardManager = shardManager;
        this.metrics = NotificationMetrics.create(metrics);
    }

    @Override // java.util.function.Consumer
    public void accept(@NonNull ShardManager.ShardAssignmentChanges shardAssignmentChanges) {
        if (shardAssignmentChanges == null) {
            throw new NullPointerException("changes is marked non-null but is null");
        }
        if (!this.started || this.closed) {
            return;
        }
        connectNotificationReceivers(shardAssignmentChanges);
    }

    public void registerCallback(@NonNull Consumer<Notification> consumer) {
        if (consumer == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        if (this.closed) {
            throw new IllegalStateException("Notification manager has been closed");
        }
        this.compositeCallback.add(consumer);
        if (this.started) {
            return;
        }
        synchronized (this) {
            if (!this.started) {
                bootstrap();
                this.started = true;
            }
        }
    }

    private void bootstrap() {
        connectNotificationReceivers(new ShardManager.ShardAssignmentChanges((Set) this.shardManager.getAll().stream().map(l -> {
            return new ShardManager.ShardAssignmentChange.Added(l.longValue(), this.shardManager.leader(l.longValue()));
        }).collect(Collectors.toSet()), Set.of(), Set.of()));
    }

    private void connectNotificationReceivers(@NonNull ShardManager.ShardAssignmentChanges shardAssignmentChanges) {
        if (shardAssignmentChanges == null) {
            throw new NullPointerException("changes is marked non-null but is null");
        }
        shardAssignmentChanges.removed().forEach(removed -> {
            this.shardReceivers.remove(Long.valueOf(removed.shardId())).close();
        });
        shardAssignmentChanges.added().forEach(added -> {
            this.shardReceivers.computeIfAbsent(Long.valueOf(added.shardId()), l -> {
                return this.recieverFactory.newReceiver(l.longValue(), added.leader(), this.metrics);
            }).start();
        });
        shardAssignmentChanges.reassigned().forEach(reassigned -> {
            Optional ofNullable = Optional.ofNullable(this.shardReceivers.remove(Long.valueOf(reassigned.shardId())));
            ofNullable.ifPresent((v0) -> {
                v0.close();
            });
            this.shardReceivers.computeIfAbsent(Long.valueOf(reassigned.shardId()), l -> {
                return this.recieverFactory.newReceiver(l.longValue(), reassigned.toLeader(), this.metrics);
            }).start(ofNullable.map((v0) -> {
                return v0.getOffset();
            }));
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.shardReceivers.values().parallelStream().forEach((v0) -> {
            v0.close();
        });
    }

    NotificationManager(@NonNull ShardNotificationReceiver.Factory factory, @NonNull ShardManager shardManager, @NonNull CompositeConsumer<Notification> compositeConsumer, @NonNull NotificationMetrics notificationMetrics) {
        if (factory == null) {
            throw new NullPointerException("recieverFactory is marked non-null but is null");
        }
        if (shardManager == null) {
            throw new NullPointerException("shardManager is marked non-null but is null");
        }
        if (compositeConsumer == null) {
            throw new NullPointerException("compositeCallback is marked non-null but is null");
        }
        if (notificationMetrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        this.recieverFactory = factory;
        this.shardManager = shardManager;
        this.compositeCallback = compositeConsumer;
        this.metrics = notificationMetrics;
    }
}
