package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.transport.stream.api.RemoteStreamMetrics;
import io.camunda.zeebe.transport.stream.impl.AggregatedRemoteStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamRegistry.class */
public class RemoteStreamRegistry<M> implements ImmutableStreamRegistry<M> {
    private final RemoteStreamMetrics metrics;
    private final ConcurrentMap<UnsafeBuffer, Set<AggregatedRemoteStream<M>>> typeToConsumers = new ConcurrentHashMap();
    private final ConcurrentMap<LogicalId<M>, AggregatedRemoteStream<M>> logicalIdToConsumers = new ConcurrentHashMap();
    private final Map<AggregatedRemoteStream.StreamId, AggregatedRemoteStream.StreamConsumer<M>> idToConsumer = new HashMap();

    public RemoteStreamRegistry(RemoteStreamMetrics remoteStreamMetrics) {
        this.metrics = remoteStreamMetrics;
    }

    public void add(UnsafeBuffer unsafeBuffer, UUID uuid, MemberId memberId, M m) {
        AggregatedRemoteStream.StreamId streamId = new AggregatedRemoteStream.StreamId(uuid, memberId);
        if (this.idToConsumer.containsKey(streamId)) {
            return;
        }
        this.typeToConsumers.putIfAbsent(unsafeBuffer, new CopyOnWriteArraySet());
        LogicalId<M> logicalId = new LogicalId<>(unsafeBuffer, m);
        this.logicalIdToConsumers.computeIfAbsent(logicalId, logicalId2 -> {
            AggregatedRemoteStream<M> aggregatedRemoteStream = new AggregatedRemoteStream<>(logicalId, new CopyOnWriteArrayList());
            this.typeToConsumers.get(unsafeBuffer).add(aggregatedRemoteStream);
            return aggregatedRemoteStream;
        });
        AggregatedRemoteStream.StreamConsumer<M> streamConsumer = new AggregatedRemoteStream.StreamConsumer<>(streamId, logicalId);
        this.logicalIdToConsumers.get(logicalId).addConsumer(streamConsumer);
        this.idToConsumer.put(streamId, streamConsumer);
        this.metrics.addStream();
    }

    public void remove(UUID uuid, MemberId memberId) {
        AggregatedRemoteStream.StreamConsumer<M> remove = this.idToConsumer.remove(new AggregatedRemoteStream.StreamId(uuid, memberId));
        if (remove != null) {
            this.logicalIdToConsumers.computeIfPresent(remove.logicalId(), (logicalId, aggregatedRemoteStream) -> {
                aggregatedRemoteStream.removeConsumer(remove);
                if (!aggregatedRemoteStream.streamConsumers().isEmpty()) {
                    return aggregatedRemoteStream;
                }
                this.typeToConsumers.get(remove.logicalId().streamType()).remove(aggregatedRemoteStream);
                return null;
            });
            this.metrics.removeStream();
        }
    }

    public void removeAll(MemberId memberId) {
        this.idToConsumer.keySet().stream().filter(streamId -> {
            return streamId.receiver().equals(memberId);
        }).toList().forEach(streamId2 -> {
            remove(streamId2.streamId(), streamId2.receiver());
        });
    }

    @Override // io.camunda.zeebe.transport.stream.impl.ImmutableStreamRegistry
    public Set<AggregatedRemoteStream<M>> get(UnsafeBuffer unsafeBuffer) {
        return this.typeToConsumers.getOrDefault(unsafeBuffer, Collections.emptySet());
    }

    public void clear() {
        this.typeToConsumers.clear();
        this.idToConsumer.clear();
        this.logicalIdToConsumers.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<AggregatedRemoteStream<M>> list() {
        return this.logicalIdToConsumers.values();
    }
}
