package org.autumnframework.service.pubsub.client.listener;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.pubsub.v1.PubsubMessage;
import io.micrometer.core.annotation.Timed;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.autumnframework.service.event.metadata.EventMetaData;
import org.autumnframework.service.queue.api.client.listener.CrudQueueListener;
import org.autumnframework.service.queue.api.client.listener.OnQueueCreateListener;
import org.autumnframework.service.queue.api.client.listener.OnQueueDeleteListener;
import org.autumnframework.service.queue.api.client.listener.OnQueueUpdateListener;
import org.autumnframework.service.queue.api.messages.IdentifiableMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:org/autumnframework/service/pubsub/client/listener/CrudMessageListenerAndDelegationService.class */
public class CrudMessageListenerAndDelegationService {
    private static final Logger log = LoggerFactory.getLogger(CrudMessageListenerAndDelegationService.class);
    private final ObjectMapper objectMapper;
    private final Map<String, Class<? extends IdentifiableMessage<?>>> classIdToClass;
    private final Map<Class<? extends IdentifiableMessage<?>>, List<CrudQueueListener<?, ?>>> classToCrudQueue;
    private final Map<Class<? extends IdentifiableMessage<?>>, List<OnQueueCreateListener>> msgClassToOnCreateListener;
    private final Map<Class<? extends IdentifiableMessage<?>>, List<OnQueueUpdateListener>> msgClassToOnUpdateListener;
    private final Map<Class<? extends IdentifiableMessage<?>>, List<OnQueueDeleteListener>> msgClassToOnDeleteListener;

    public CrudMessageListenerAndDelegationService(ObjectMapper objectMapper, List<CrudQueueListener<?, ?>> list, List<OnQueueCreateListener<?, ?>> list2, List<OnQueueUpdateListener<?, ?>> list3, List<OnQueueDeleteListener<?, ?>> list4) {
        this.objectMapper = objectMapper;
        this.classIdToClass = (Map) list.stream().collect(Collectors.toMap(crudQueueListener -> {
            return crudQueueListener.getMessageType().getName();
        }, (v0) -> {
            return v0.getMessageType();
        }));
        this.classIdToClass.putAll((Map) list2.stream().collect(Collectors.toMap(onQueueCreateListener -> {
            return onQueueCreateListener.getMessageType().getName();
        }, (v0) -> {
            return v0.getMessageType();
        })));
        this.classIdToClass.putAll((Map) list3.stream().collect(Collectors.toMap(onQueueUpdateListener -> {
            return onQueueUpdateListener.getMessageType().getName();
        }, (v0) -> {
            return v0.getMessageType();
        })));
        this.classIdToClass.putAll((Map) list4.stream().collect(Collectors.toMap(onQueueDeleteListener -> {
            return onQueueDeleteListener.getMessageType().getName();
        }, (v0) -> {
            return v0.getMessageType();
        })));
        this.classToCrudQueue = (Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getMessageType();
        }));
        this.msgClassToOnCreateListener = (Map) list2.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getMessageType();
        }));
        this.msgClassToOnUpdateListener = (Map) list3.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getMessageType();
        }));
        this.msgClassToOnDeleteListener = (Map) list4.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getMessageType();
        }));
    }

    public void handle(PubsubMessage pubsubMessage) {
        String str;
        String str2 = (String) pubsubMessage.getAttributesMap().get("__Operation__");
        if (str2 == null) {
            log.warn("Received message without operation type header, defaulting to 'create'");
            str = "create";
        } else {
            str = str2.toLowerCase().split("\\.")[0];
        }
        String str3 = str;
        boolean z = -1;
        switch (str3.hashCode()) {
            case -1352294148:
                if (str3.equals("create")) {
                    z = false;
                    break;
                }
                break;
            case -1335458389:
                if (str3.equals("delete")) {
                    z = 2;
                    break;
                }
                break;
            case -838846263:
                if (str3.equals("update")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                handleCreate(pubsubMessage);
                return;
            case true:
                handleUpdate(pubsubMessage);
                return;
            case true:
                handleDelete(pubsubMessage);
                return;
            default:
                log.error("Operation not supported: {}, message will be dropped", pubsubMessage.getAttributesMap().get("__Operation__"));
                return;
        }
    }

    @Timed(value = "autumn.messaging.pubsub.received", description = "Number of messages received", extraTags = {"routing-key", "in"})
    void handle(PubsubMessage pubsubMessage, BiConsumer<Class<? extends IdentifiableMessage<?>>, IdentifiableMessage> biConsumer) {
        String str = (String) pubsubMessage.getAttributesMap().get("__TypeId__");
        log.trace("Received {} request for {}", (String) pubsubMessage.getAttributesMap().get("__Operation__"), str);
        try {
            String str2 = new String(Base64.getDecoder().decode(pubsubMessage.getData().toByteArray()));
            if (!this.classIdToClass.containsKey(str)) {
                log.warn("Received a message, but no handler was registered in the context that can handle this message. Consider registering a class of OnQueueCreateListener<?, {}>, OnQueueUpdateListener<?, {}>, OnQueueDeleteListener<?, {}> or CrudQueueListener<?, {}> if you need direct access to the IdentifiableMessage instance.Message is ignored: {}.", new Object[]{str, str, str, str, str2});
                return;
            }
            Class<? extends IdentifiableMessage<?>> cls = this.classIdToClass.get(str);
            try {
                biConsumer.accept(cls, (IdentifiableMessage) this.objectMapper.readValue(str2, cls));
            } catch (JsonProcessingException e) {
                log.error("JsonProcessingException while mapping message: {}", e.getMessage());
                throw new IllegalStateException("Failed to deserialize message body into class" + cls, e);
            }
        } catch (Exception e2) {
            log.error("Caught Exeption while decoding message: {}", e2.getMessage());
            throw new RuntimeException("Failed to parse message body to String for classId: " + str);
        }
    }

    public void delegateToCrudQueueListeners(Class<? extends IdentifiableMessage<?>> cls, IdentifiableMessage<?> identifiableMessage, BiConsumer<CrudQueueListener, IdentifiableMessage> biConsumer) {
        List<CrudQueueListener<?, ?>> list = this.classToCrudQueue.get(cls);
        if (CollectionUtils.isEmpty(list)) {
            log.trace("No CrudQueueListener for message {}", cls.getSimpleName());
        } else {
            list.forEach(crudQueueListener -> {
                biConsumer.accept(crudQueueListener, identifiableMessage);
            });
        }
    }

    public void delegateToOnCreateListeners(Class<? extends IdentifiableMessage<?>> cls, IdentifiableMessage<?> identifiableMessage) {
        List<OnQueueCreateListener> list = this.msgClassToOnCreateListener.get(cls);
        if (CollectionUtils.isEmpty(list)) {
            log.trace("No OnQueueCreateListener for message {}", cls.getSimpleName());
        } else {
            list.forEach(onQueueCreateListener -> {
                onQueueCreateListener.onCreate(identifiableMessage.getPayload(), EventMetaData.builder().messageChainId(identifiableMessage.getMessageChainId()).build());
            });
        }
    }

    protected void delegateToOnUpdateListeners(Class<? extends IdentifiableMessage<?>> cls, IdentifiableMessage<?> identifiableMessage) {
        List<OnQueueUpdateListener> list = this.msgClassToOnUpdateListener.get(cls);
        if (CollectionUtils.isEmpty(list)) {
            log.trace("No OnQueueUpdateListener for message {}", cls.getSimpleName());
        } else {
            list.forEach(onQueueUpdateListener -> {
                onQueueUpdateListener.onUpdate(identifiableMessage.getPayload(), EventMetaData.builder().messageChainId(identifiableMessage.getMessageChainId()).build());
            });
        }
    }

    protected void delegateToOnDeleteListeners(Class<? extends IdentifiableMessage<?>> cls, IdentifiableMessage<?> identifiableMessage) {
        List<OnQueueDeleteListener> list = this.msgClassToOnDeleteListener.get(cls);
        if (CollectionUtils.isEmpty(list)) {
            log.trace("No OnQueueDeleteListener for message {}", cls.getSimpleName());
        } else {
            list.forEach(onQueueDeleteListener -> {
                onQueueDeleteListener.onDelete(identifiableMessage.getPayload(), EventMetaData.builder().messageChainId(identifiableMessage.getMessageChainId()).build());
            });
        }
    }

    @Timed(value = "autumn.messaging.pubsub.received.create", description = "Number of create messages received", extraTags = {"routing-key", "create"})
    void handleCreate(PubsubMessage pubsubMessage) {
        handle(pubsubMessage, (cls, identifiableMessage) -> {
            delegateToCrudQueueListeners(cls, identifiableMessage, (v0, v1) -> {
                v0.handleCreate(v1);
            });
            delegateToOnCreateListeners(cls, identifiableMessage);
        });
    }

    @Timed(value = "autumn.messaging.pubsub.received.update", description = "Number of update messages received", extraTags = {"routing-key", "update"})
    void handleUpdate(PubsubMessage pubsubMessage) {
        handle(pubsubMessage, (cls, identifiableMessage) -> {
            delegateToCrudQueueListeners(cls, identifiableMessage, (v0, v1) -> {
                v0.handleUpdate(v1);
            });
            delegateToOnUpdateListeners(cls, identifiableMessage);
        });
    }

    @Timed(value = "autumn.messaging.pubsub.received.delete", description = "Number of delete messages received", extraTags = {"routing-key", "delete"})
    void handleDelete(PubsubMessage pubsubMessage) {
        handle(pubsubMessage, (cls, identifiableMessage) -> {
            delegateToCrudQueueListeners(cls, identifiableMessage, (v0, v1) -> {
                v0.handleDelete(v1);
            });
            delegateToOnDeleteListeners(cls, identifiableMessage);
        });
    }
}
