package pl.bristleback.server.bristle.message.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import pl.bristleback.server.bristle.api.WebsocketConnector;
import pl.bristleback.server.bristle.api.WebsocketMessage;
import pl.bristleback.server.bristle.message.AbstractMessageDispatcher;

@Component("system.dispatcher.single.threaded")
/* loaded from: input_file:pl/bristleback/server/bristle/message/akka/SingleThreadMessageDispatcher.class */
public class SingleThreadMessageDispatcher extends AbstractMessageDispatcher {
    private static Logger log = Logger.getLogger(SingleThreadMessageDispatcher.class.getName());
    private static final long DELAY = 1000;
    private boolean dispatcherRunning;
    private final BlockingQueue<WebsocketMessage> messages = new LinkedBlockingQueue();
    private ActorRef sendMessageActor;

    /* loaded from: input_file:pl/bristleback/server/bristle/message/akka/SingleThreadMessageDispatcher$Dispatcher.class */
    private class Dispatcher implements Runnable {
        private Dispatcher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (SingleThreadMessageDispatcher.this.dispatcherRunning) {
                try {
                    SingleThreadMessageDispatcher.this.dispatchMessages();
                } catch (Exception e) {
                    SingleThreadMessageDispatcher.log.error(e);
                    return;
                }
            }
        }
    }

    @Override // pl.bristleback.server.bristle.api.MessageDispatcher
    public void addMessage(WebsocketMessage websocketMessage) {
        this.messages.add(websocketMessage);
    }

    @Override // pl.bristleback.server.bristle.api.MessageDispatcher
    public void dispatchMessages() throws Exception {
        WebsocketMessage poll = this.messages.poll(DELAY, TimeUnit.MILLISECONDS);
        if (poll != null) {
            log.debug("Sending a server message: " + poll.getContent());
            if (CollectionUtils.isEmpty(poll.getRecipients())) {
                log.debug("Empty or null recipients collection: " + poll.getRecipients());
            } else {
                sendMessage(poll);
            }
        }
    }

    private void sendMessage(WebsocketMessage websocketMessage) throws Exception {
        Iterator<WebsocketConnector> it = websocketMessage.getRecipients().iterator();
        while (it.hasNext()) {
            this.sendMessageActor.tell(new MessageForConnector(websocketMessage, it.next()));
        }
    }

    @Override // pl.bristleback.server.bristle.api.MessageDispatcher
    public void startDispatching() {
        if (this.dispatcherRunning) {
            throw new IllegalStateException("Dispatcher already running.");
        }
        log.info("Starting single threaded message dispatcher");
        this.sendMessageActor = ActorSystem.create("BristlebackSystem").actorOf(new Props(new UntypedActorFactory() { // from class: pl.bristleback.server.bristle.message.akka.SingleThreadMessageDispatcher.1
            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public UntypedActor m24create() {
                return new SendMessageActor(SingleThreadMessageDispatcher.this.getServer());
            }
        }), "MessageDispatcherActor");
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        setDispatcherRunning(true);
        newSingleThreadExecutor.execute(new Dispatcher());
    }

    @Override // pl.bristleback.server.bristle.api.MessageDispatcher
    public void stopDispatching() {
        if (!this.dispatcherRunning) {
            throw new IllegalStateException("Dispatcher is not running yet");
        }
        setDispatcherRunning(false);
    }

    private void setDispatcherRunning(boolean z) {
        this.dispatcherRunning = z;
    }
}
