package io.kestra.runner.memory;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowListenersInterface;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@MemoryQueueEnabled
/* loaded from: input_file:io/kestra/runner/memory/MemoryFlowListeners.class */
public class MemoryFlowListeners implements FlowListenersInterface {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MemoryFlowListeners.class);
    private static final ObjectMapper MAPPER = JacksonMapper.ofJson();
    private static final TypeReference<List<Flow>> TYPE_REFERENCE = new TypeReference<List<Flow>>() { // from class: io.kestra.runner.memory.MemoryFlowListeners.1
    };
    private final QueueInterface<Flow> flowQueue;
    private final List<Flow> flows;
    private final List<Consumer<List<Flow>>> consumers = new ArrayList();

    @Inject
    public MemoryFlowListeners(FlowRepositoryInterface flowRepositoryInterface, @Named("flowQueue") QueueInterface<Flow> queueInterface) {
        this.flowQueue = queueInterface;
        this.flows = flowRepositoryInterface.findAll();
    }

    public void run() {
        this.flowQueue.receive(flow -> {
            if (flow.isDeleted()) {
                remove(flow);
            } else {
                upsert(flow);
            }
            if (log.isTraceEnabled()) {
                Logger logger = log;
                Object[] objArr = new Object[3];
                objArr[0] = flow.isDeleted() ? "deletion" : "update";
                objArr[1] = flow.getNamespace();
                objArr[2] = flow.getId();
                logger.trace("Received {} flow '{}.{}'", objArr);
            }
            notifyConsumers();
        });
        notifyConsumers();
        if (log.isTraceEnabled()) {
            log.trace("FlowListenersService started with {} flows", Integer.valueOf(this.flows.size()));
        }
    }

    private boolean remove(Flow flow) {
        boolean removeIf;
        synchronized (this) {
            removeIf = this.flows.removeIf(flow2 -> {
                return flow2.getNamespace().equals(flow.getNamespace()) && flow2.getId().equals(flow.getId());
            });
            if (!removeIf && flow.isDeleted()) {
                log.warn("Can't remove flow {}.{}", flow.getNamespace(), flow.getId());
            }
        }
        return removeIf;
    }

    private synchronized void upsert(Flow flow) {
        remove(flow);
        this.flows.add(flow);
    }

    private void notifyConsumers() {
        this.consumers.forEach(consumer -> {
            consumer.accept(new ArrayList(this.flows));
        });
    }

    public void listen(Consumer<List<Flow>> consumer) {
        this.consumers.add(consumer);
        consumer.accept(new ArrayList(flows()));
    }

    public List<Flow> flows() {
        return (List) MAPPER.readValue(MAPPER.writeValueAsString(this.flows), TYPE_REFERENCE);
    }
}
