package net.spals.appbuilder.message.core.blockingqueue;

import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.typesafe.config.Config;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import net.spals.appbuilder.annotations.config.ServiceConfig;
import net.spals.appbuilder.annotations.service.AutoBindInMap;
import net.spals.appbuilder.config.message.MessageConsumerConfig;
import net.spals.appbuilder.executor.core.ExecutorServiceFactory;
import net.spals.appbuilder.message.core.MessageConsumerCallback;
import net.spals.appbuilder.message.core.consumer.MessageConsumerPlugin;
import net.spals.appbuilder.model.core.ModelSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoBindInMap(baseClass = MessageConsumerPlugin.class, key = "blockingQueue")
/* loaded from: input_file:net/spals/appbuilder/message/core/blockingqueue/BlockingQueueMessageConsumerPlugin.class */
class BlockingQueueMessageConsumerPlugin implements MessageConsumerPlugin {
    private static final Logger LOGGER = LoggerFactory.getLogger(BlockingQueueMessageConsumerPlugin.class);
    private final Set<MessageConsumerCallback<?>> consumerCallbackSet;
    private final BlockingQueue<BlockingQueueMessage> blockingMessageQueue;
    private final ExecutorService executorService;
    private final Long pollTimeout;
    private final TimeUnit pollTimeoutUnit;

    /* loaded from: input_file:net/spals/appbuilder/message/core/blockingqueue/BlockingQueueMessageConsumerPlugin$BlockingQueueConsumerRunnable.class */
    class BlockingQueueConsumerRunnable implements Runnable {
        private final Map<Class<?>, MessageConsumerCallback<?>> consumerCallbacks;
        private final MessageConsumerConfig consumerConfig;
        private final ModelSerializer modelSerializer;

        BlockingQueueConsumerRunnable(Map<Class<?>, MessageConsumerCallback<?>> map, MessageConsumerConfig messageConsumerConfig, ModelSerializer modelSerializer) {
            this.consumerCallbacks = map;
            this.consumerConfig = messageConsumerConfig;
            this.modelSerializer = modelSerializer;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    BlockingQueueMessage blockingQueueMessage = (BlockingQueueMessage) BlockingQueueMessageConsumerPlugin.this.blockingMessageQueue.poll(BlockingQueueMessageConsumerPlugin.this.pollTimeout.longValue(), BlockingQueueMessageConsumerPlugin.this.pollTimeoutUnit);
                    if (blockingQueueMessage != null) {
                        Object deserialize = this.modelSerializer.deserialize(blockingQueueMessage.getSerializedPayload());
                        BlockingQueueMessageConsumerPlugin.LOGGER.trace("Received '{}' message: {}", blockingQueueMessage.getTag(), deserialize);
                        Optional ofNullable = Optional.ofNullable(this.consumerCallbacks.get(deserialize.getClass()));
                        if (ofNullable.isPresent()) {
                            ((MessageConsumerCallback) ofNullable.get()).processMessage(this.consumerConfig, deserialize);
                        } else {
                            BlockingQueueMessageConsumerPlugin.LOGGER.warn(MessageConsumerCallback.unregisteredCallbackMessage(this.consumerConfig.getTag(), deserialize.getClass()));
                        }
                    }
                } catch (InterruptedException e) {
                    BlockingQueueMessageConsumerPlugin.LOGGER.warn("BlockingQueue message queue was interrupted!");
                } catch (Throwable th) {
                    BlockingQueueMessageConsumerPlugin.LOGGER.error("Encountered unexpected error during callback of BlockingQueue messages", th);
                }
            }
            BlockingQueueMessageConsumerPlugin.LOGGER.info("Stopping blocking queue message consumer thread");
        }
    }

    @Inject
    BlockingQueueMessageConsumerPlugin(@ServiceConfig Config config, Set<MessageConsumerCallback<?>> set, ExecutorServiceFactory executorServiceFactory, @Named("message.blockingQueue") BlockingQueue<BlockingQueueMessage> blockingQueue) {
        this.pollTimeout = (Long) Optional.of(config).filter(config2 -> {
            return config2.hasPath("messageConsumer.blockingQueue.pollTimeout");
        }).map(config3 -> {
            return Long.valueOf(config3.getLong("messageConsumer.blockingQueue.pollTimeout"));
        }).orElse(10L);
        this.pollTimeoutUnit = (TimeUnit) Optional.of(config).filter(config4 -> {
            return config4.hasPath("messageConsumer.blockingQueue.pollTimeoutUnit");
        }).map(config5 -> {
            return (TimeUnit) config5.getEnum(TimeUnit.class, "messageConsumer.blockingQueue.pollTimeoutUnit");
        }).orElse(TimeUnit.MILLISECONDS);
        this.consumerCallbackSet = set;
        this.blockingMessageQueue = blockingQueue;
        this.executorService = executorServiceFactory.createFixedThreadPool(Math.max(set.size(), 1), getClass(), new String[0]);
    }

    @Override // net.spals.appbuilder.message.core.consumer.MessageConsumerPlugin
    public synchronized void start(MessageConsumerConfig messageConsumerConfig, ModelSerializer modelSerializer) {
        this.executorService.submit(new BlockingQueueConsumerRunnable(MessageConsumerCallback.loadCallbacksForTag(messageConsumerConfig.getTag(), this.consumerCallbackSet), messageConsumerConfig, modelSerializer));
    }

    @Override // net.spals.appbuilder.message.core.consumer.MessageConsumerPlugin
    public void stop(MessageConsumerConfig messageConsumerConfig) {
    }
}
