package com.cloudimpl.cluster4j.core;

import com.cloudimpl.cluster4j.core.FluxStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:com/cloudimpl/cluster4j/core/FluxMap.class */
public class FluxMap<K, V> implements FluxStream<K, V>, ConcurrentMap<K, V> {
    private final Map<K, V> map;
    private final Flux<FluxStream.Event<K, V>> flux;
    private List<FluxSink<FluxStream.Event<K, V>>> emitters;

    public FluxMap(Map<K, V> map) {
        this.emitters = new CopyOnWriteArrayList();
        this.map = map;
        this.flux = Flux.fromIterable(() -> {
            return map.entrySet().iterator();
        }).map(entry -> {
            return new FluxStream.Event(FluxStream.Event.Type.ADD, entry.getKey(), entry.getValue());
        }).concatWith(Flux.create(fluxSink -> {
            this.emitters.add(fluxSink);
            fluxSink.onCancel(() -> {
                removeEmitter(fluxSink);
            });
            fluxSink.onDispose(() -> {
                removeEmitter(fluxSink);
            });
        }));
    }

    public FluxMap() {
        this(new ConcurrentHashMap());
    }

    @Override // com.cloudimpl.cluster4j.core.FluxStream
    public Flux<FluxStream.Event<K, V>> flux() {
        return this.flux;
    }

    @Override // java.util.Map
    public Collection<V> values() {
        return this.map.values();
    }

    @Override // java.util.Map
    public V put(K k, V v) {
        V put = this.map.put(k, v);
        if (put == null) {
            sinkNext(new FluxStream.Event<>(FluxStream.Event.Type.ADD, k, v));
        } else {
            sinkNext(new FluxStream.Event<>(FluxStream.Event.Type.UPDATE, k, v));
        }
        return put;
    }

    @Override // java.util.concurrent.ConcurrentMap, java.util.Map
    public V putIfAbsent(K k, V v) {
        V putIfAbsent = this.map.putIfAbsent(k, v);
        if (putIfAbsent == null) {
            sinkNext(new FluxStream.Event<>(FluxStream.Event.Type.ADD, k, v));
        }
        return putIfAbsent;
    }

    @Override // java.util.Map
    public V get(Object obj) {
        return this.map.get(obj);
    }

    private void sinkNext(FluxStream.Event<K, V> event) {
        this.emitters.forEach(fluxSink -> {
            fluxSink.next(event);
        });
    }

    private void removeEmitter(FluxSink fluxSink) {
        this.emitters.remove(fluxSink);
    }

    @Override // java.util.Map
    public V remove(Object obj) {
        V remove = this.map.remove(obj);
        if (remove != null) {
            sinkNext(new FluxStream.Event<>(FluxStream.Event.Type.REMOVE, obj, remove));
        }
        return remove;
    }

    @Override // java.util.concurrent.ConcurrentMap, java.util.Map
    public boolean remove(Object obj, Object obj2) {
        boolean remove = this.map.remove(obj, obj2);
        if (remove) {
            sinkNext(new FluxStream.Event<>(FluxStream.Event.Type.REMOVE, obj, obj2));
        }
        return remove;
    }

    @Override // java.util.concurrent.ConcurrentMap, java.util.Map
    public boolean replace(K k, V v, V v2) {
        boolean replace = this.map.replace(k, v, v2);
        if (replace) {
            sinkNext(new FluxStream.Event<>(FluxStream.Event.Type.UPDATE, k, v2));
        }
        return replace;
    }

    @Override // java.util.concurrent.ConcurrentMap, java.util.Map
    public V replace(K k, V v) {
        V replace = this.map.replace(k, v);
        if (replace != null) {
            sinkNext(new FluxStream.Event<>(FluxStream.Event.Type.UPDATE, k, v));
        }
        return replace;
    }

    @Override // java.util.Map
    public int size() {
        return this.map.size();
    }

    @Override // java.util.Map
    public boolean isEmpty() {
        return this.map.isEmpty();
    }

    @Override // java.util.Map
    public boolean containsKey(Object obj) {
        return this.map.containsKey(obj);
    }

    @Override // java.util.Map
    public boolean containsValue(Object obj) {
        return this.map.containsValue(obj);
    }

    @Override // java.util.Map
    public void putAll(Map<? extends K, ? extends V> map) {
        this.map.putAll(map);
        map.entrySet().forEach(entry -> {
            sinkNext(new FluxStream.Event<>(FluxStream.Event.Type.ADD, entry.getKey(), entry.getValue()));
        });
    }

    @Override // java.util.Map
    public void clear() {
        this.map.entrySet().forEach(entry -> {
            sinkNext(new FluxStream.Event<>(FluxStream.Event.Type.REMOVE, entry.getKey(), entry.getValue()));
        });
        this.map.clear();
    }

    @Override // java.util.Map
    public Set<K> keySet() {
        return this.map.keySet();
    }

    @Override // java.util.Map
    public Set<Map.Entry<K, V>> entrySet() {
        return this.map.entrySet();
    }
}
