package br.net.eventstore.publisher;

import br.net.eventstore.model.Message;
import com.google.gson.Gson;
import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:br/net/eventstore/publisher/RedisPublisher.class */
public class RedisPublisher implements Publisher, HasSubscribers {
    private static Lock writeLock = new ReentrantLock();
    private ConcurrentHashMap<String, List<Subscriber>> listeners;
    private StatefulRedisPubSubConnection<String, String> connection;
    private RedisPubSubAsyncCommands<String, String> commands;
    private Gson serializer;
    private RedisPubSubListener<String, String> listener;

    public RedisPublisher(RedisClient redisClient) {
        this.listeners = new ConcurrentHashMap<>();
        this.connection = redisClient.connectPubSub();
        this.commands = this.connection.async();
        this.serializer = new Gson();
    }

    public RedisPublisher(String str) {
        this(RedisClient.create(str));
    }

    @Override // br.net.eventstore.publisher.Publisher
    public void publish(Message message) {
        this.commands.publish(message.getAggregation(), this.serializer.toJson(message));
    }

    @Override // br.net.eventstore.publisher.HasSubscribers
    public Subscription subscribe(String str, Subscriber subscriber) {
        ensureRedisListener();
        List<Subscriber> computeIfAbsent = this.listeners.computeIfAbsent(str, str2 -> {
            this.commands.subscribe(new String[]{str});
            return Collections.synchronizedList(new ArrayList());
        });
        computeIfAbsent.add(subscriber);
        return () -> {
            this.commands.unsubscribe(new String[]{str});
            computeIfAbsent.remove(subscriber);
        };
    }

    private void ensureRedisListener() {
        if (this.listener == null) {
            writeLock.lock();
            if (this.listener == null) {
                try {
                    this.listener = new RedisPubSubListener<String, String>() { // from class: br.net.eventstore.publisher.RedisPublisher.1
                        public void message(String str, String str2) {
                            Message message = (Message) RedisPublisher.this.serializer.fromJson(str2, Message.class);
                            List list = (List) RedisPublisher.this.listeners.get(message.getAggregation());
                            if (list != null) {
                                list.forEach(subscriber -> {
                                    subscriber.on(message);
                                });
                            }
                        }

                        public void message(String str, String str2, String str3) {
                        }

                        public void subscribed(String str, long j) {
                        }

                        public void psubscribed(String str, long j) {
                        }

                        public void unsubscribed(String str, long j) {
                        }

                        public void punsubscribed(String str, long j) {
                        }
                    };
                    this.connection.addListener(this.listener);
                    writeLock.unlock();
                } catch (Throwable th) {
                    writeLock.unlock();
                    throw th;
                }
            }
        }
    }
}
