package org.icij.datashare.com;

import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.icij.datashare.CollectionUtils;
import org.icij.datashare.com.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/icij/datashare/com/MemoryDataBus.class */
public class MemoryDataBus implements DataBus {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Map<Consumer<Message>, MessageListener> subscribers = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/icij/datashare/com/MemoryDataBus$MessageListener.class */
    public static class MessageListener implements Consumer<Message> {
        private final Consumer<Message> subscriber;
        private final LinkedHashSet<Channel> channels;
        final AtomicReference<Message> message = new AtomicReference<>();
        final AtomicInteger nbMessages = new AtomicInteger(0);

        public MessageListener(Consumer<Message> consumer, Channel... channelArr) {
            this.subscriber = consumer;
            this.channels = CollectionUtils.asSet(channelArr);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean hasSubscribedTo(Channel channel) {
            return this.channels.contains(channel);
        }

        @Override // java.util.function.Consumer
        public void accept(Message message) {
            this.subscriber.accept(message);
            synchronized (this.message) {
                this.message.set(message);
                this.message.notify();
            }
            this.nbMessages.getAndIncrement();
        }

        boolean shutdownAsked() {
            Message message = this.message.get();
            return message != null && message.type == Message.Type.SHUTDOWN;
        }

        int loopUntilShutdown() throws InterruptedException {
            synchronized (this.message) {
                while (!shutdownAsked()) {
                    this.message.wait();
                }
            }
            return this.nbMessages.get();
        }
    }

    @Override // org.icij.datashare.com.Publisher
    public void publish(Channel channel, Message message) {
        Message message2 = (Message) Objects.requireNonNull(message, "cannot publish a null message");
        this.subscribers.values().stream().filter(messageListener -> {
            return messageListener.hasSubscribedTo(channel);
        }).forEach(messageListener2 -> {
            messageListener2.accept(message2);
        });
    }

    @Override // org.icij.datashare.com.DataBus
    public int subscribe(Consumer<Message> consumer, Channel... channelArr) throws InterruptedException {
        return subscribe(consumer, () -> {
            this.logger.debug("subscribed {} to {}", consumer, Arrays.toString(channelArr));
        }, channelArr);
    }

    @Override // org.icij.datashare.com.DataBus
    public int subscribe(Consumer<Message> consumer, Runnable runnable, Channel... channelArr) throws InterruptedException {
        MessageListener messageListener = new MessageListener(consumer, channelArr);
        this.subscribers.put(consumer, messageListener);
        runnable.run();
        int loopUntilShutdown = messageListener.loopUntilShutdown();
        this.logger.info("exiting {}", consumer);
        return loopUntilShutdown;
    }

    @Override // org.icij.datashare.com.DataBus
    public void unsubscribe(Consumer<Message> consumer) {
        Optional.ofNullable(this.subscribers.remove(consumer)).ifPresent(messageListener -> {
            messageListener.accept((Message) new ShutdownMessage());
            this.logger.debug("unsubscribed {}", consumer);
        });
    }

    @Override // org.icij.datashare.com.DataBus
    public boolean getHealth() {
        return true;
    }
}
