package io.domainlifecycles.events.mq.consume;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.domainlifecycles.access.classes.ClassProvider;
import io.domainlifecycles.domain.types.DomainEvent;
import io.domainlifecycles.events.consume.execution.detector.ExecutionContextDetector;
import io.domainlifecycles.events.consume.execution.processor.ExecutionContextProcessor;
import io.domainlifecycles.events.exception.DLCEventsException;
import io.domainlifecycles.mirror.api.Domain;
import io.domainlifecycles.mirror.api.DomainEventMirror;
import io.domainlifecycles.mirror.api.DomainType;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/domainlifecycles/events/mq/consume/AbstractMqDomainEventConsumer.class */
public abstract class AbstractMqDomainEventConsumer<CONSUMER, MESSAGE> implements MqDomainEventConsumer {
    protected final ObjectMapper objectMapper;
    protected final ExecutionContextDetector executionContextDetector;
    protected final ExecutionContextProcessor executionContextProcessor;
    protected final ClassProvider classProvider;
    protected ExecutorService consumerThreadPool;
    private final Logger log = LoggerFactory.getLogger(AbstractMqDomainEventConsumer.class);
    protected List<MqDomainEventHandler> handlers = new ArrayList();
    protected List<CONSUMER> consumers = new ArrayList();
    protected List<Future<Void>> processingFutures = new ArrayList();
    protected AtomicBoolean runFlag = new AtomicBoolean(true);
    protected boolean initialized = false;

    public AbstractMqDomainEventConsumer(ObjectMapper objectMapper, ExecutionContextDetector executionContextDetector, ExecutionContextProcessor executionContextProcessor, ClassProvider classProvider) {
        this.objectMapper = (ObjectMapper) Objects.requireNonNull(objectMapper, "ObjectMapper is required!");
        this.executionContextDetector = (ExecutionContextDetector) Objects.requireNonNull(executionContextDetector, "An ExecutionContextDetector is required!");
        this.executionContextProcessor = (ExecutionContextProcessor) Objects.requireNonNull(executionContextProcessor, "An ExecutionContextProcessor is required!");
        this.classProvider = (ClassProvider) Objects.requireNonNull(classProvider, "A ClassProvider is required!");
    }

    @Override // io.domainlifecycles.events.mq.consume.MqDomainEventConsumer
    public void initialize() {
        connect();
        initializeHandlers();
        this.initialized = true;
    }

    protected abstract void connect();

    protected void subscribe(MqDomainEventHandler mqDomainEventHandler) {
        this.log.info("Subscribing handler {}", mqDomainEventHandler.getHandlerId());
        this.handlers.add(mqDomainEventHandler);
        CONSUMER createConsumer = createConsumer(mqDomainEventHandler.getDomainEventType().getName(), mqDomainEventHandler.getHandlerId());
        this.consumers.add(createConsumer);
        this.processingFutures.add(CompletableFuture.supplyAsync(() -> {
            return process(createConsumer, mqDomainEventHandler);
        }, this.consumerThreadPool));
        this.log.info("Subscribed handler {}", mqDomainEventHandler.getHandlerId());
    }

    protected abstract CONSUMER createConsumer(String str, String str2);

    protected Void process(CONSUMER consumer, MqDomainEventHandler mqDomainEventHandler) {
        this.log.info("Consumer starting processing. HandlerId: {}", mqDomainEventHandler.getHandlerId());
        while (this.runFlag.get()) {
            if (!mqDomainEventHandler.isPaused()) {
                this.log.trace("Consuming message for HandlerId: {}", mqDomainEventHandler.getHandlerId());
                MESSAGE consumeMessage = consumeMessage(consumer);
                if (consumeMessage != null) {
                    DomainEvent parseMessage = parseMessage(consumeMessage, mqDomainEventHandler.getDomainEventType());
                    if (parseMessage != null) {
                        try {
                            this.log.trace("Invoking handler {}", mqDomainEventHandler.getHandlerId());
                            mqDomainEventHandler.handle(parseMessage);
                            this.log.trace("Handled message {}", mqDomainEventHandler.getHandlerId());
                        } catch (Throwable th) {
                            this.log.error("Handling message failed {}", mqDomainEventHandler.getHandlerId(), th);
                        }
                    }
                    acknowledge(consumeMessage);
                }
            }
        }
        this.log.info("Processing finished");
        closeConsumer(consumer);
        return null;
    }

    protected abstract MESSAGE consumeMessage(CONSUMER consumer);

    protected DomainEvent parseMessage(MESSAGE message, Class<? extends DomainEvent> cls) {
        String messageBody = messageBody(message);
        try {
            return (DomainEvent) this.objectMapper.readValue(messageBody, cls);
        } catch (JsonProcessingException e) {
            String format = String.format("DomainEvent '%s' deserialialization failed!", messageBody);
            this.log.error(format, e);
            throw DLCEventsException.fail(format, e);
        }
    }

    protected abstract void acknowledge(MESSAGE message);

    protected abstract String messageBody(MESSAGE message);

    protected void initializeHandlers() {
        List list = Domain.getInitializedDomain().allTypeMirrors().values().stream().filter(domainTypeMirror -> {
            return !domainTypeMirror.isAbstract() && domainTypeMirror.getDomainType().equals(DomainType.DOMAIN_EVENT);
        }).map(domainTypeMirror2 -> {
            return (DomainEventMirror) domainTypeMirror2;
        }).flatMap(domainEventMirror -> {
            return handlersForDomainEvent(domainEventMirror).stream();
        }).toList();
        this.log.info("Subscribing handlers count = {}", Integer.valueOf(list.size()));
        this.consumerThreadPool = Executors.newFixedThreadPool(list.size());
        list.forEach(this::subscribe);
    }

    protected List<MqDomainEventHandler> handlersForDomainEvent(DomainEventMirror domainEventMirror) {
        ArrayList arrayList = new ArrayList();
        domainEventMirror.getListeningDomainServices().forEach(domainServiceMirror -> {
            arrayList.addAll(domainServiceMirror.getMethods().stream().filter(methodMirror -> {
                return methodMirror.listensTo(domainEventMirror);
            }).map(methodMirror2 -> {
                return newHandler(domainServiceMirror.getTypeName(), methodMirror2.getName(), domainEventMirror.getTypeName());
            }).toList());
        });
        domainEventMirror.getListeningApplicationServices().forEach(applicationServiceMirror -> {
            arrayList.addAll(applicationServiceMirror.getMethods().stream().filter(methodMirror -> {
                return methodMirror.listensTo(domainEventMirror);
            }).map(methodMirror2 -> {
                return newHandler(applicationServiceMirror.getTypeName(), methodMirror2.getName(), domainEventMirror.getTypeName());
            }).toList());
        });
        domainEventMirror.getListeningOutboundServices().forEach(outboundServiceMirror -> {
            arrayList.addAll(outboundServiceMirror.getMethods().stream().filter(methodMirror -> {
                return methodMirror.listensTo(domainEventMirror);
            }).map(methodMirror2 -> {
                return newHandler(outboundServiceMirror.getTypeName(), methodMirror2.getName(), domainEventMirror.getTypeName());
            }).toList());
        });
        domainEventMirror.getListeningRepositories().forEach(repositoryMirror -> {
            arrayList.addAll(repositoryMirror.getMethods().stream().filter(methodMirror -> {
                return methodMirror.listensTo(domainEventMirror);
            }).map(methodMirror2 -> {
                return newHandler(repositoryMirror.getTypeName(), methodMirror2.getName(), domainEventMirror.getTypeName());
            }).toList());
        });
        domainEventMirror.getListeningQueryHandlers().forEach(queryHandlerMirror -> {
            arrayList.addAll(queryHandlerMirror.getMethods().stream().filter(methodMirror -> {
                return methodMirror.listensTo(domainEventMirror);
            }).map(methodMirror2 -> {
                return newHandler(queryHandlerMirror.getTypeName(), methodMirror2.getName(), domainEventMirror.getTypeName());
            }).toList());
        });
        domainEventMirror.getListeningAggregates().forEach(aggregateRootMirror -> {
            arrayList.addAll(aggregateRootMirror.getMethods().stream().filter(methodMirror -> {
                return methodMirror.listensTo(domainEventMirror);
            }).map(methodMirror2 -> {
                return newHandler(aggregateRootMirror.getTypeName(), methodMirror2.getName(), domainEventMirror.getTypeName());
            }).toList());
        });
        return arrayList;
    }

    protected MqDomainEventHandler newHandler(String str, String str2, String str3) {
        return new MqDomainEventHandler(str, str2, this.classProvider.getClassForName(str3), this.executionContextDetector, this.executionContextProcessor);
    }

    @Override // io.domainlifecycles.events.mq.consume.MqDomainEventConsumer
    public void closeAll() {
        this.runFlag.set(false);
        this.processingFutures.forEach(future -> {
            try {
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                this.log.error("Getting data from future failed", e);
            }
        });
        this.consumers.forEach(obj -> {
            closeConsumer(obj);
        });
        closeConnection();
        this.consumerThreadPool.shutdown();
        this.log.info("Closed session and connection");
    }

    protected abstract void closeConnection();

    protected abstract void closeConsumer(CONSUMER consumer);

    public boolean isInitialized() {
        return this.initialized;
    }

    @Override // io.domainlifecycles.events.mq.consume.MqDomainEventConsumer
    public void pauseHandler(String str, String str2, String str3) {
        this.log.info("Pause handler {}.{}({})", new Object[]{str, str2, str3});
        MqDomainEventHandler mqDomainEventHandler = get(str, str2, str3);
        if (mqDomainEventHandler != null) {
            mqDomainEventHandler.pause();
        }
    }

    @Override // io.domainlifecycles.events.mq.consume.MqDomainEventConsumer
    public void resumeHandler(String str, String str2, String str3) {
        this.log.info("Resume handler {}.{}({})", new Object[]{str, str2, str3});
        MqDomainEventHandler mqDomainEventHandler = get(str, str2, str3);
        if (mqDomainEventHandler != null) {
            mqDomainEventHandler.resume();
        }
    }

    @Override // io.domainlifecycles.events.mq.consume.MqDomainEventConsumer
    public boolean isHandlerPaused(String str, String str2, String str3) {
        MqDomainEventHandler mqDomainEventHandler = get(str, str2, str3);
        if (mqDomainEventHandler != null) {
            return mqDomainEventHandler.isPaused();
        }
        return false;
    }

    private MqDomainEventHandler get(String str, String str2, String str3) {
        Objects.requireNonNull(str, "A handlerClassName must be defined!");
        Objects.requireNonNull(str, "A handlerMethodName must be defined!");
        Objects.requireNonNull(str, "A domainEventTypeName must be defined!");
        Optional<MqDomainEventHandler> findFirst = this.handlers.stream().filter(mqDomainEventHandler -> {
            return mqDomainEventHandler.getHandlerClassName().equals(str) && mqDomainEventHandler.getHandlerMethodName().equals(str2) && mqDomainEventHandler.getDomainEventType().getName().equals(str3);
        }).findFirst();
        if (findFirst.isPresent()) {
            return findFirst.get();
        }
        this.log.error("No handler found: {}.{}({})", new Object[]{str, str2, str3});
        return null;
    }
}
