/*
 * Decompiled with CFR 0.152.
 */
package net.spals.appbuilder.message.core.blockingqueue;

import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.typesafe.config.Config;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import net.spals.appbuilder.annotations.config.ServiceConfig;
import net.spals.appbuilder.annotations.service.AutoBindInMap;
import net.spals.appbuilder.config.message.MessageConsumerConfig;
import net.spals.appbuilder.executor.core.ExecutorServiceFactory;
import net.spals.appbuilder.message.core.MessageConsumerCallback;
import net.spals.appbuilder.message.core.blockingqueue.BlockingQueueMessage;
import net.spals.appbuilder.message.core.consumer.MessageConsumerPlugin;
import net.spals.appbuilder.model.core.ModelSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoBindInMap(baseClass=MessageConsumerPlugin.class, key="blockingQueue")
class BlockingQueueMessageConsumerPlugin
implements MessageConsumerPlugin {
    private static final Logger LOGGER = LoggerFactory.getLogger(BlockingQueueMessageConsumerPlugin.class);
    private final Set<MessageConsumerCallback<?>> consumerCallbackSet;
    private final BlockingQueue<BlockingQueueMessage> blockingMessageQueue;
    private final ExecutorService executorService;
    private final Long pollTimeout;
    private final TimeUnit pollTimeoutUnit;

    @Inject
    BlockingQueueMessageConsumerPlugin(@ServiceConfig Config serviceConfig, Set<MessageConsumerCallback<?>> consumerCallbackSet, ExecutorServiceFactory executorServiceFactory, @Named(value="message.blockingQueue") BlockingQueue<BlockingQueueMessage> blockingMessageQueue) {
        this.pollTimeout = Optional.of(serviceConfig).filter(config -> config.hasPath("messageConsumer.blockingQueue.pollTimeout")).map(config -> config.getLong("messageConsumer.blockingQueue.pollTimeout")).orElse(10L);
        this.pollTimeoutUnit = Optional.of(serviceConfig).filter(config -> config.hasPath("messageConsumer.blockingQueue.pollTimeoutUnit")).map(config -> (TimeUnit)config.getEnum(TimeUnit.class, "messageConsumer.blockingQueue.pollTimeoutUnit")).orElse(TimeUnit.MILLISECONDS);
        this.consumerCallbackSet = consumerCallbackSet;
        this.blockingMessageQueue = blockingMessageQueue;
        this.executorService = executorServiceFactory.createFixedThreadPool(Math.max(consumerCallbackSet.size(), 1), new ExecutorServiceFactory.Key.Builder(this.getClass()).build());
    }

    @Override
    public synchronized void start(MessageConsumerConfig consumerConfig, ModelSerializer modelSerializer) {
        Map<Class<?>, MessageConsumerCallback<?>> consumerCallbacks = MessageConsumerCallback.loadCallbacksForTag(consumerConfig.getTag(), this.consumerCallbackSet);
        BlockingQueueConsumerRunnable consumerRunnable = new BlockingQueueConsumerRunnable(consumerCallbacks, consumerConfig, modelSerializer);
        this.executorService.submit(consumerRunnable);
    }

    @Override
    public void stop(MessageConsumerConfig consumerConfig) {
    }

    class BlockingQueueConsumerRunnable
    implements Runnable {
        private final Map<Class<?>, MessageConsumerCallback<?>> consumerCallbacks;
        private final MessageConsumerConfig consumerConfig;
        private final ModelSerializer modelSerializer;

        BlockingQueueConsumerRunnable(Map<Class<?>, MessageConsumerCallback<?>> consumerCallbacks, MessageConsumerConfig consumerConfig, ModelSerializer modelSerializer) {
            this.consumerCallbacks = consumerCallbacks;
            this.consumerConfig = consumerConfig;
            this.modelSerializer = modelSerializer;
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    BlockingQueueMessage message = (BlockingQueueMessage)BlockingQueueMessageConsumerPlugin.this.blockingMessageQueue.poll(BlockingQueueMessageConsumerPlugin.this.pollTimeout, BlockingQueueMessageConsumerPlugin.this.pollTimeoutUnit);
                    if (message == null) continue;
                    Object payload = this.modelSerializer.deserialize(message.getSerializedPayload());
                    LOGGER.trace("Received '{}' message: {}", (Object)message.getTag(), payload);
                    Optional<MessageConsumerCallback<?>> consumerCallback = Optional.ofNullable(this.consumerCallbacks.get(payload.getClass()));
                    if (consumerCallback.isPresent()) {
                        consumerCallback.get().processMessage(this.consumerConfig, payload);
                        continue;
                    }
                    LOGGER.warn(MessageConsumerCallback.unregisteredCallbackMessage(this.consumerConfig.getTag(), payload.getClass()));
                }
            }
            catch (InterruptedException e) {
                LOGGER.warn("BlockingQueue message queue was interrupted!");
            }
            catch (Throwable t) {
                LOGGER.error("Encountered unexpected error during callback of BlockingQueue messages", t);
            }
            LOGGER.info("Stopping blocking queue message consumer thread");
        }
    }
}

