package com.fluxtion.runtime.server.dutycycle;

import com.fluxtion.agrona.concurrent.DynamicCompositeAgent;
import com.fluxtion.agrona.concurrent.OneToOneConcurrentArrayQueue;
import com.fluxtion.runtime.StaticEventProcessor;
import com.fluxtion.runtime.annotations.feature.Experimental;
import com.fluxtion.runtime.input.EventFeed;
import com.fluxtion.runtime.lifecycle.Lifecycle;
import com.fluxtion.runtime.server.FluxtionServer;
import com.fluxtion.runtime.server.service.scheduler.DeadWheelScheduler;
import com.fluxtion.runtime.server.service.scheduler.SchedulerService;
import com.fluxtion.runtime.server.subscription.EventFlowManager;
import com.fluxtion.runtime.server.subscription.EventSubscriptionKey;
import com.fluxtion.runtime.service.Service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.logging.Logger;

@Experimental
/* loaded from: input_file:com/fluxtion/runtime/server/dutycycle/ComposingEventProcessorAgent.class */
public class ComposingEventProcessorAgent extends DynamicCompositeAgent implements EventFeed<EventSubscriptionKey<?>> {
    private static final Logger log = Logger.getLogger(ComposingEventProcessorAgent.class.getName());
    private final EventFlowManager eventFlowManager;
    private final ConcurrentHashMap<String, Service<?>> registeredServices;
    private final ConcurrentHashMap<EventSubscriptionKey<?>, EventQueueToEventProcessor> queueProcessorMap;
    private final OneToOneConcurrentArrayQueue<Supplier<StaticEventProcessor>> toStartList;
    private final List<EventQueueToEventProcessor> queueReadersToAdd;
    private final FluxtionServer fluxtionServer;
    private final DeadWheelScheduler scheduler;
    private final Service<SchedulerService> schedulerService;

    public ComposingEventProcessorAgent(String str, EventFlowManager eventFlowManager, FluxtionServer fluxtionServer, DeadWheelScheduler deadWheelScheduler, ConcurrentHashMap<String, Service<?>> concurrentHashMap) {
        super(str, deadWheelScheduler);
        this.queueProcessorMap = new ConcurrentHashMap<>();
        this.toStartList = new OneToOneConcurrentArrayQueue<>(128);
        this.queueReadersToAdd = new ArrayList();
        this.eventFlowManager = eventFlowManager;
        this.fluxtionServer = fluxtionServer;
        this.scheduler = deadWheelScheduler;
        this.registeredServices = concurrentHashMap;
        this.schedulerService = new Service<>(deadWheelScheduler, SchedulerService.class);
    }

    public void addEventFeedConsumer(Supplier<StaticEventProcessor> supplier) {
        this.toStartList.add(supplier);
    }

    @Override // com.fluxtion.agrona.concurrent.DynamicCompositeAgent, com.fluxtion.agrona.concurrent.Agent
    public void onStart() {
        log.info("onStart");
        super.onStart();
    }

    @Override // com.fluxtion.agrona.concurrent.DynamicCompositeAgent, com.fluxtion.agrona.concurrent.Agent
    public int doWork() throws Exception {
        this.toStartList.drain(supplier -> {
            StaticEventProcessor staticEventProcessor = (StaticEventProcessor) supplier.get();
            EventFlowManager.setCurrentProcessor(staticEventProcessor);
            staticEventProcessor.registerService((Service<?>) this.schedulerService);
            Collection<Service<?>> values = this.registeredServices.values();
            staticEventProcessor.getClass();
            values.forEach(staticEventProcessor::registerService);
            staticEventProcessor.addEventFeed(this);
            if (staticEventProcessor instanceof Lifecycle) {
                ((Lifecycle) staticEventProcessor).start();
            }
            EventFlowManager.removeCurrentProcessor();
        });
        if (!this.queueReadersToAdd.isEmpty() && tryAdd(this.queueReadersToAdd.get(0))) {
            this.queueReadersToAdd.remove(0);
        }
        return super.doWork();
    }

    @Override // com.fluxtion.agrona.concurrent.DynamicCompositeAgent, com.fluxtion.agrona.concurrent.Agent
    public void onClose() {
        log.info("onClose");
        super.onClose();
    }

    @Override // com.fluxtion.runtime.input.EventFeed
    public void registerSubscriber(StaticEventProcessor staticEventProcessor) {
        log.info("registerSubscriber:" + staticEventProcessor);
    }

    @Override // com.fluxtion.runtime.input.EventFeed
    public void subscribe(StaticEventProcessor staticEventProcessor, EventSubscriptionKey<?> eventSubscriptionKey) {
        Objects.requireNonNull(staticEventProcessor, "subscriber is null");
        Objects.requireNonNull(eventSubscriptionKey, "subscriptionKey is null");
        log.info("subscribe subscriptionKey:" + eventSubscriptionKey + " subscriber:" + staticEventProcessor);
        EventQueueToEventProcessor eventQueueToEventProcessor = this.queueProcessorMap.get(eventSubscriptionKey);
        if (eventQueueToEventProcessor == null) {
            eventQueueToEventProcessor = this.eventFlowManager.getMappingAgent(eventSubscriptionKey, this);
            this.queueProcessorMap.put(eventSubscriptionKey, eventQueueToEventProcessor);
            this.queueReadersToAdd.add(eventQueueToEventProcessor);
            log.info("added new subscribe subscriptionKey:" + eventSubscriptionKey + " subscriber:" + staticEventProcessor);
        }
        eventQueueToEventProcessor.registerProcessor(staticEventProcessor);
        this.eventFlowManager.subscribe(eventSubscriptionKey);
    }

    @Override // com.fluxtion.runtime.input.EventFeed
    public void unSubscribe(StaticEventProcessor staticEventProcessor, EventSubscriptionKey<?> eventSubscriptionKey) {
        if (this.queueProcessorMap.containsKey(eventSubscriptionKey) && this.queueProcessorMap.get(eventSubscriptionKey).deregisterProcessor(staticEventProcessor) == 0) {
            log.info("EventQueueToEventProcessor listener count = 0, removing subscription:" + eventSubscriptionKey);
            this.queueProcessorMap.remove(eventSubscriptionKey);
            this.eventFlowManager.unSubscribe(eventSubscriptionKey);
        }
    }

    @Override // com.fluxtion.runtime.input.EventFeed
    public void removeAllSubscriptions(StaticEventProcessor staticEventProcessor) {
    }
}
