package io.micrc.core.message;

import io.micrc.core.message.error.ErrorMessage;
import io.micrc.core.message.error.ErrorMessageRepository;
import io.micrc.core.message.store.EventMessage;
import io.micrc.core.message.store.EventMessageRepository;
import io.micrc.core.message.store.IdempotentMessage;
import io.micrc.core.message.store.IdempotentMessageRepository;
import io.micrc.lib.JsonUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.camel.Body;
import org.apache.camel.Consume;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.support.ExpressionAdapter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

/* loaded from: input_file:io/micrc/core/message/MessageRouteConfiguration.class */
public class MessageRouteConfiguration extends RouteBuilder implements ApplicationContextAware {
    private ApplicationContext applicationContext;

    @EndpointInject
    private ProducerTemplate producerTemplate;

    @Autowired
    Environment environment;

    /* loaded from: input_file:io/micrc/core/message/MessageRouteConfiguration$EventsInfo.class */
    public static class EventsInfo {
        private final List<Event> eventsCache = new ArrayList();
        private final HashMap<String, Event> eventsInfo = new HashMap<>();

        /* loaded from: input_file:io/micrc/core/message/MessageRouteConfiguration$EventsInfo$Event.class */
        public static class Event {
            private String topicName;
            private String eventName;
            private List<EventMapping> eventMappings;

            /* loaded from: input_file:io/micrc/core/message/MessageRouteConfiguration$EventsInfo$Event$EventBuilder.class */
            public static abstract class EventBuilder<C extends Event, B extends EventBuilder<C, B>> {
                private String topicName;
                private String eventName;
                private List<EventMapping> eventMappings;

                public B topicName(String str) {
                    this.topicName = str;
                    return self();
                }

                public B eventName(String str) {
                    this.eventName = str;
                    return self();
                }

                public B eventMappings(List<EventMapping> list) {
                    this.eventMappings = list;
                    return self();
                }

                protected abstract B self();

                public abstract C build();

                public String toString() {
                    return "MessageRouteConfiguration.EventsInfo.Event.EventBuilder(topicName=" + this.topicName + ", eventName=" + this.eventName + ", eventMappings=" + this.eventMappings + ")";
                }
            }

            /* JADX INFO: Access modifiers changed from: private */
            /* loaded from: input_file:io/micrc/core/message/MessageRouteConfiguration$EventsInfo$Event$EventBuilderImpl.class */
            public static final class EventBuilderImpl extends EventBuilder<Event, EventBuilderImpl> {
                private EventBuilderImpl() {
                }

                /* JADX INFO: Access modifiers changed from: protected */
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // io.micrc.core.message.MessageRouteConfiguration.EventsInfo.Event.EventBuilder
                public EventBuilderImpl self() {
                    return this;
                }

                @Override // io.micrc.core.message.MessageRouteConfiguration.EventsInfo.Event.EventBuilder
                public Event build() {
                    return new Event(this);
                }
            }

            protected Event(EventBuilder<?, ?> eventBuilder) {
                this.topicName = ((EventBuilder) eventBuilder).topicName;
                this.eventName = ((EventBuilder) eventBuilder).eventName;
                this.eventMappings = ((EventBuilder) eventBuilder).eventMappings;
            }

            public static EventBuilder<?, ?> builder() {
                return new EventBuilderImpl();
            }

            public String getTopicName() {
                return this.topicName;
            }

            public String getEventName() {
                return this.eventName;
            }

            public List<EventMapping> getEventMappings() {
                return this.eventMappings;
            }

            public void setTopicName(String str) {
                this.topicName = str;
            }

            public void setEventName(String str) {
                this.eventName = str;
            }

            public void setEventMappings(List<EventMapping> list) {
                this.eventMappings = list;
            }

            public boolean equals(Object obj) {
                if (obj == this) {
                    return true;
                }
                if (!(obj instanceof Event)) {
                    return false;
                }
                Event event = (Event) obj;
                if (!event.canEqual(this)) {
                    return false;
                }
                String topicName = getTopicName();
                String topicName2 = event.getTopicName();
                if (topicName == null) {
                    if (topicName2 != null) {
                        return false;
                    }
                } else if (!topicName.equals(topicName2)) {
                    return false;
                }
                String eventName = getEventName();
                String eventName2 = event.getEventName();
                if (eventName == null) {
                    if (eventName2 != null) {
                        return false;
                    }
                } else if (!eventName.equals(eventName2)) {
                    return false;
                }
                List<EventMapping> eventMappings = getEventMappings();
                List<EventMapping> eventMappings2 = event.getEventMappings();
                return eventMappings == null ? eventMappings2 == null : eventMappings.equals(eventMappings2);
            }

            protected boolean canEqual(Object obj) {
                return obj instanceof Event;
            }

            public int hashCode() {
                String topicName = getTopicName();
                int hashCode = (1 * 59) + (topicName == null ? 43 : topicName.hashCode());
                String eventName = getEventName();
                int hashCode2 = (hashCode * 59) + (eventName == null ? 43 : eventName.hashCode());
                List<EventMapping> eventMappings = getEventMappings();
                return (hashCode2 * 59) + (eventMappings == null ? 43 : eventMappings.hashCode());
            }

            public String toString() {
                return "MessageRouteConfiguration.EventsInfo.Event(topicName=" + getTopicName() + ", eventName=" + getEventName() + ", eventMappings=" + getEventMappings() + ")";
            }
        }

        /* loaded from: input_file:io/micrc/core/message/MessageRouteConfiguration$EventsInfo$EventMapping.class */
        public static class EventMapping {
            private String mappingKey;
            private String mappingPath;
            private String receiverAddress;
            private String batchModel;

            /* loaded from: input_file:io/micrc/core/message/MessageRouteConfiguration$EventsInfo$EventMapping$EventMappingBuilder.class */
            public static abstract class EventMappingBuilder<C extends EventMapping, B extends EventMappingBuilder<C, B>> {
                private String mappingKey;
                private String mappingPath;
                private String receiverAddress;
                private String batchModel;

                public B mappingKey(String str) {
                    this.mappingKey = str;
                    return self();
                }

                public B mappingPath(String str) {
                    this.mappingPath = str;
                    return self();
                }

                public B receiverAddress(String str) {
                    this.receiverAddress = str;
                    return self();
                }

                public B batchModel(String str) {
                    this.batchModel = str;
                    return self();
                }

                protected abstract B self();

                public abstract C build();

                public String toString() {
                    return "MessageRouteConfiguration.EventsInfo.EventMapping.EventMappingBuilder(mappingKey=" + this.mappingKey + ", mappingPath=" + this.mappingPath + ", receiverAddress=" + this.receiverAddress + ", batchModel=" + this.batchModel + ")";
                }
            }

            /* loaded from: input_file:io/micrc/core/message/MessageRouteConfiguration$EventsInfo$EventMapping$EventMappingBuilderImpl.class */
            private static final class EventMappingBuilderImpl extends EventMappingBuilder<EventMapping, EventMappingBuilderImpl> {
                private EventMappingBuilderImpl() {
                }

                /* JADX INFO: Access modifiers changed from: protected */
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // io.micrc.core.message.MessageRouteConfiguration.EventsInfo.EventMapping.EventMappingBuilder
                public EventMappingBuilderImpl self() {
                    return this;
                }

                @Override // io.micrc.core.message.MessageRouteConfiguration.EventsInfo.EventMapping.EventMappingBuilder
                public EventMapping build() {
                    return new EventMapping(this);
                }
            }

            protected EventMapping(EventMappingBuilder<?, ?> eventMappingBuilder) {
                this.mappingKey = ((EventMappingBuilder) eventMappingBuilder).mappingKey;
                this.mappingPath = ((EventMappingBuilder) eventMappingBuilder).mappingPath;
                this.receiverAddress = ((EventMappingBuilder) eventMappingBuilder).receiverAddress;
                this.batchModel = ((EventMappingBuilder) eventMappingBuilder).batchModel;
            }

            public static EventMappingBuilder<?, ?> builder() {
                return new EventMappingBuilderImpl();
            }

            public String getMappingKey() {
                return this.mappingKey;
            }

            public String getMappingPath() {
                return this.mappingPath;
            }

            public String getReceiverAddress() {
                return this.receiverAddress;
            }

            public String getBatchModel() {
                return this.batchModel;
            }

            public void setMappingKey(String str) {
                this.mappingKey = str;
            }

            public void setMappingPath(String str) {
                this.mappingPath = str;
            }

            public void setReceiverAddress(String str) {
                this.receiverAddress = str;
            }

            public void setBatchModel(String str) {
                this.batchModel = str;
            }

            public boolean equals(Object obj) {
                if (obj == this) {
                    return true;
                }
                if (!(obj instanceof EventMapping)) {
                    return false;
                }
                EventMapping eventMapping = (EventMapping) obj;
                if (!eventMapping.canEqual(this)) {
                    return false;
                }
                String mappingKey = getMappingKey();
                String mappingKey2 = eventMapping.getMappingKey();
                if (mappingKey == null) {
                    if (mappingKey2 != null) {
                        return false;
                    }
                } else if (!mappingKey.equals(mappingKey2)) {
                    return false;
                }
                String mappingPath = getMappingPath();
                String mappingPath2 = eventMapping.getMappingPath();
                if (mappingPath == null) {
                    if (mappingPath2 != null) {
                        return false;
                    }
                } else if (!mappingPath.equals(mappingPath2)) {
                    return false;
                }
                String receiverAddress = getReceiverAddress();
                String receiverAddress2 = eventMapping.getReceiverAddress();
                if (receiverAddress == null) {
                    if (receiverAddress2 != null) {
                        return false;
                    }
                } else if (!receiverAddress.equals(receiverAddress2)) {
                    return false;
                }
                String batchModel = getBatchModel();
                String batchModel2 = eventMapping.getBatchModel();
                return batchModel == null ? batchModel2 == null : batchModel.equals(batchModel2);
            }

            protected boolean canEqual(Object obj) {
                return obj instanceof EventMapping;
            }

            public int hashCode() {
                String mappingKey = getMappingKey();
                int hashCode = (1 * 59) + (mappingKey == null ? 43 : mappingKey.hashCode());
                String mappingPath = getMappingPath();
                int hashCode2 = (hashCode * 59) + (mappingPath == null ? 43 : mappingPath.hashCode());
                String receiverAddress = getReceiverAddress();
                int hashCode3 = (hashCode2 * 59) + (receiverAddress == null ? 43 : receiverAddress.hashCode());
                String batchModel = getBatchModel();
                return (hashCode3 * 59) + (batchModel == null ? 43 : batchModel.hashCode());
            }

            public String toString() {
                return "MessageRouteConfiguration.EventsInfo.EventMapping(mappingKey=" + getMappingKey() + ", mappingPath=" + getMappingPath() + ", receiverAddress=" + getReceiverAddress() + ", batchModel=" + getBatchModel() + ")";
            }

            public EventMapping() {
            }

            public EventMapping(String str, String str2, String str3, String str4) {
                this.mappingKey = str;
                this.mappingPath = str2;
                this.receiverAddress = str3;
                this.batchModel = str4;
            }
        }

        public void put(String str, Event event) {
            this.eventsInfo.put(str, event);
            this.eventsCache.add(event);
        }

        public List<Event> getAllEvents() {
            return this.eventsCache;
        }

        public Event get(String str) {
            return this.eventsInfo.get(str);
        }
    }

    /* loaded from: input_file:io/micrc/core/message/MessageRouteConfiguration$SplitList.class */
    public class SplitList extends ExpressionAdapter {
        public SplitList() {
        }

        public Object evaluate(Exchange exchange) {
            List list = (List) exchange.getIn().getBody();
            if (null != list) {
                return list.iterator();
            }
            return null;
        }
    }

    public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @KafkaListener(topics = {"deadLetter"}, autoStartup = "true", concurrency = "3", containerFactory = "kafkaListenerContainerFactory")
    public void deadLetter(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment) {
        try {
            HashMap hashMap = new HashMap();
            for (Header header : consumerRecord.headers()) {
                hashMap.put(header.key(), new String(header.value()));
            }
            if (((String) hashMap.get("senderHost")).equals(this.environment.getProperty("micrc.x-host"))) {
                ErrorMessage errorMessage = new ErrorMessage();
                errorMessage.setMessageId(Long.valueOf((String) hashMap.get("messageId")));
                errorMessage.setEvent((String) hashMap.get("event"));
                errorMessage.setContent(consumerRecord.value().toString());
                errorMessage.setGroupId((String) hashMap.get("kafka_dlt-original-consumer-group"));
                if (Boolean.parseBoolean((String) hashMap.get("isCopyEvent"))) {
                    errorMessage.setOriginalTopic((String) hashMap.get("kafka_dlt-original-topic"));
                    errorMessage.setOriginalMapping(JsonUtil.writeValueAsString(((HashMap) JsonUtil.writeValueAsObject((String) hashMap.get("mappingMap"), HashMap.class)).values().iterator().next()));
                }
                errorMessage.setErrorCount(1);
                errorMessage.setErrorStatus("STOP");
                errorMessage.setErrorMessage((String) hashMap.get("kafka_dlt-exception-message"));
                this.producerTemplate.requestBody("publish://error-sending-resolve", errorMessage);
                this.log.info("死信保存: " + errorMessage.getMessageId());
            }
            acknowledgment.acknowledge();
        } catch (Exception e) {
            acknowledgment.nack(Duration.ofMillis(0L));
        }
    }

    @Consume("eventstore://get-event-message")
    public EventMessage getEventMessage(@Body String str, @org.apache.camel.Header("currentCommandJson") String str2) {
        EventMessage eventMessage = new EventMessage();
        eventMessage.setContent(str2);
        eventMessage.setRegion(str);
        eventMessage.setStatus("WAITING");
        return eventMessage;
    }

    @Consume("publish://sending-message")
    public void send(@Body Object obj, @org.apache.camel.Header("eventInfo") EventsInfo.Event event, @org.apache.camel.Header("isCopyEvent") Boolean bool) {
        KafkaTemplate kafkaTemplate;
        HashMap hashMap = (HashMap) JsonUtil.writeObjectAsObject(obj, HashMap.class);
        String str = (String) hashMap.get("content");
        Long l = (Long) hashMap.get("messageId");
        Object obj2 = hashMap.get("groupId");
        Message build = MessageBuilder.withPayload(str).setHeader("kafka_topic", event.getTopicName()).setHeader("groupId", "".equals(obj2) ? null : obj2).setHeader("senderHost", this.environment.getProperty("micrc.x-host")).setHeader("messageId", l).setHeader("isCopyEvent", bool).setHeader("event", event.getEventName()).setHeader("mappingMap", (Map) event.getEventMappings().stream().collect(Collectors.toMap((v0) -> {
            return v0.getMappingKey();
        }, eventMapping -> {
            return eventMapping;
        }, (eventMapping2, eventMapping3) -> {
            return eventMapping2;
        }))).build();
        List asList = Arrays.asList(((String) Optional.ofNullable(this.environment.getProperty("application.profiles")).orElse("")).split(","));
        if (asList.contains("default") || asList.contains("local")) {
            kafkaTemplate = (KafkaTemplate) this.applicationContext.getBean("kafkaTemplate", KafkaTemplate.class);
        } else {
            String topicName = event.getTopicName();
            String str2 = (String) ((Properties) this.environment.getPropertySources().get("micrc").getSource()).entrySet().stream().filter(entry -> {
                if (entry.getKey().toString().startsWith("micrc.broker.topics.")) {
                    return Arrays.asList(entry.getValue().toString().split(",")).contains(topicName);
                }
                return false;
            }).map(entry2 -> {
                String[] split = entry2.getKey().toString().split("micrc\\.broker\\.topics\\.");
                return split[split.length - 1];
            }).findFirst().orElseThrow();
            kafkaTemplate = (KafkaTemplate) this.applicationContext.getBean("kafkaTemplate" + ("public".equalsIgnoreCase(str2) ? "" : "-" + str2), KafkaTemplate.class);
        }
        kafkaTemplate.send(build).completable().whenCompleteAsync((sendResult, th) -> {
            if (null != th) {
                this.producerTemplate.requestBody("publish://error-sending-resolve", constructErrorMessage(event, str, l, bool, th.getLocalizedMessage()));
                this.log.error("发送失败: " + l + "，是否死信" + (obj2 != null));
                return;
            }
            ErrorMessage errorMessage = new ErrorMessage();
            errorMessage.setMessageId(l);
            errorMessage.setGroupId((String) obj2);
            this.producerTemplate.requestBody("publish://success-sending-resolve", errorMessage);
            this.log.info("发送成功: " + l + "，是否死信" + (obj2 != null));
        });
    }

    private ErrorMessage constructErrorMessage(EventsInfo.Event event, String str, Long l, Boolean bool, String str2) {
        ErrorMessage errorMessage = new ErrorMessage();
        errorMessage.setMessageId(l);
        errorMessage.setEvent(event.getEventName());
        errorMessage.setContent(str);
        errorMessage.setGroupId("");
        if (bool != null && bool.booleanValue()) {
            errorMessage.setOriginalTopic(event.getTopicName());
            errorMessage.setOriginalMapping(JsonUtil.writeValueAsString(event.getEventMappings().get(0)));
        }
        errorMessage.setErrorCount(1);
        errorMessage.setErrorStatus("WAITING");
        errorMessage.setErrorMessage(str2);
        return errorMessage;
    }

    @Consume("publish://error-sending-resolve-create")
    public ErrorMessage createErrorMessage(@org.apache.camel.Header("current") ErrorMessage errorMessage) {
        return errorMessage;
    }

    @Consume("publish://error-resolving")
    public ErrorMessage errorResolving(@Body ErrorMessage errorMessage) {
        errorMessage.setErrorStatus("SENDING");
        return errorMessage;
    }

    @Consume("publish://normal-resolving")
    public EventMessage normalResolving(@Body EventMessage eventMessage) {
        eventMessage.setStatus("SENT");
        return eventMessage;
    }

    @Consume("publish://error-sending-resolve-update")
    public ErrorMessage updateErrorMessage(@Body ErrorMessage errorMessage, @org.apache.camel.Header("current") ErrorMessage errorMessage2) {
        errorMessage.setErrorCount(Integer.valueOf(errorMessage.getErrorCount().intValue() + 1));
        errorMessage.setErrorStatus(errorMessage2.getErrorStatus());
        errorMessage.setErrorMessage(errorMessage2.getErrorMessage());
        return errorMessage;
    }

    @Consume("subscribe://idempotent-message")
    public IdempotentMessage idempotent(Map<String, Object> map) {
        IdempotentMessage idempotentMessage = new IdempotentMessage();
        idempotentMessage.setSender((String) map.get("senderHost"));
        idempotentMessage.setSequence(Long.valueOf(map.get("messageId").toString()));
        idempotentMessage.setReceiver((String) map.get("serviceName"));
        return idempotentMessage;
    }

    @Consume("clean://idempotent-consumed-filter")
    public List<Long> filter(@Body List<Long> list, @org.apache.camel.Header("eventInfo") EventsInfo.Event event) {
        List<Long> list2 = list;
        List<EventsInfo.EventMapping> eventMappings = event.getEventMappings();
        for (int i = 0; i < eventMappings.size() && !list2.isEmpty(); i++) {
            EventsInfo.EventMapping eventMapping = eventMappings.get(i);
            HashMap hashMap = new HashMap();
            hashMap.put("messageIds", list2);
            hashMap.put("receiver", eventMapping.getMappingKey());
            list2 = JsonUtil.writeValueAsList((String) this.producerTemplate.requestBodyAndHeaders("rest://post:/api/check-idempotent-consumed?host=" + spliceHost(eventMapping.receiverAddress), JsonUtil.writeValueAsString(hashMap), constructHeaders(), String.class), Long.class);
        }
        if (!list2.isEmpty()) {
            this.log.info("消息表清理：" + JsonUtil.writeValueAsString(list2));
        }
        return list2;
    }

    private HashMap<String, Object> constructHeaders() {
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put("Authorization", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwZXJtaXNzaW9ucyI6WyIqOioiXSwidXNlcm5hbWUiOiItMSJ9.N97J1cv1Io02TLwAekzOoDHRFrnGOYeXCUiDhbAYBYY");
        return hashMap;
    }

    private String spliceHost(String str) {
        List asList = Arrays.asList(((String) Optional.ofNullable(this.environment.getProperty("application.profiles")).orElse("")).split(","));
        String[] split = str.split("\\.");
        if (split.length != 3) {
            throw new RuntimeException("x-host invalid");
        }
        String str2 = split[0];
        String str3 = split[1];
        String str4 = split[2];
        return (str3.equals(this.environment.getProperty("micrc.domain")) && str4.equals(((String) Objects.requireNonNull(this.environment.getProperty("spring.application.name"))).replace("-service", ""))) ? "http://localhost:" + this.environment.getProperty("local.server.port") : "http://" + str4 + "-service." + str2 + "-" + str3 + "-" + ((String) asList.get(0)) + ".svc.cluster.local";
    }

    @Consume("clean://store-removed-filter")
    public List<Long> filter(@Body List<Long> list, @org.apache.camel.Header("senderAddress") String str) {
        if (list.isEmpty()) {
            return list;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("messageIds", list);
        list.removeAll(JsonUtil.writeValueAsList((String) this.producerTemplate.requestBodyAndHeaders("rest://post:/api/check-store-removed?host=" + spliceHost(str), JsonUtil.writeValueAsString(hashMap), constructHeaders(), String.class), Long.class));
        if (!list.isEmpty()) {
            this.log.info("幂等仓清理：" + JsonUtil.writeValueAsString(list));
        }
        return list;
    }

    public void configure() throws Exception {
        from("eventstore://store").routeId("eventstore://store").setHeader("currentCommandJson", body()).setHeader("pointer", constant("/event/eventName")).to("json-patch://select").choice().when(body().isNotNull()).to("eventstore://get-event-message").bean(EventMessageRepository.class, "save").endChoice().end().end();
        from("publish://send-normal").routeId("publish://send-normal").transacted().to("publish://normal-resolving").bean(EventMessageRepository.class, "save").setHeader("eventInfo", exchangeProperty("eventInfo")).to("publish://sending-message").end();
        from("publish://send-error").routeId("publish://send-error").transacted().to("publish://error-resolving").bean(ErrorMessageRepository.class, "save").setHeader("eventInfo", exchangeProperty("eventInfo")).to("publish://sending-message").end();
        from("subscribe://idempotent-check").routeId("subscribe://idempotent-check").setHeader("messageDetail", body()).bean(IdempotentMessageRepository.class, "findFirstBySequenceAndReceiver(${body.get(sequence)}, ${body.get(servicePath)})").choice().when(body().isNull()).setBody(header("messageDetail")).to("subscribe://idempotent-message").bean(IdempotentMessageRepository.class, "save").setBody(constant(false)).endChoice().otherwise().setBody(constant(true)).endChoice().end();
        from("publish://error-sending-resolve").routeId("publish://error-sending-resolve").transacted().setHeader("current", body()).bean(ErrorMessageRepository.class, "findFirstByMessageIdAndGroupId(${body.messageId}, ${body.groupId})").choice().when(body().isNull()).to("publish://error-sending-resolve-create").endChoice().otherwise().to("publish://error-sending-resolve-update").endChoice().end().bean(ErrorMessageRepository.class, "save").end();
        from("publish://success-sending-resolve").routeId("publish://success-sending-resolve").transacted().choice().when(simple("${body.groupId}").isNotNull()).bean(ErrorMessageRepository.class, "deleteByMessageIdAndGroupId(${body.messageId}, ${body.groupId})").endChoice().end();
        from("rest:post:check-idempotent-consumed").routeId("rest:post:check-idempotent-consumed").convertBodyTo(String.class).unmarshal().json(HashMap.class).bean(IdempotentMessageRepository.class, "filterMessageIdByMessageIdsAndReceiver(${body.get(messageIds)}, ${body.get(receiver)})").marshal().json().convertBodyTo(String.class).end();
        from("rest:post:check-store-removed").routeId("rest:post:check-store-removed").convertBodyTo(String.class).unmarshal().json(HashMap.class).bean(EventMessageRepository.class, "findUnRemoveIdsByMessageIds(${body.get(messageIds)})").marshal().json().convertBodyTo(String.class).end();
        from("eventstore://sender").routeId("eventstore://sender").bean(EventsInfo.class, "getAllEvents").split(new SplitList()).parallelProcessing().setProperty("eventInfo", body()).bean(ErrorMessageRepository.class, "findErrorMessageByEventLimitByCount(${exchange.properties.get(eventInfo).getEventName()}, 100)").setHeader("errorMessageCount", simple("${body.size}")).setProperty("errorEvents", body()).process(exchange -> {
            exchange.getIn().setHeader("normalMessageCount", Integer.valueOf(1000 - ((Integer) exchange.getIn().getHeader("errorMessageCount")).intValue()));
        }).bean(EventMessageRepository.class, "findEventMessageByRegionLimitByCount(${exchange.properties.get(eventInfo).getEventName()}, ${header.normalMessageCount})").setProperty("normalEvents", body()).setBody(exchangeProperty("errorEvents")).split(new SplitList()).parallelProcessing().to("publish://send-error").end().setBody(exchangeProperty("normalEvents")).split(new SplitList()).parallelProcessing().to("publish://send-normal").end().end().setHeader("isCopyEvent", constant(true)).bean(EventMessageRepository.class, "findEventMessageByOriginalExists()").split(new SplitList()).parallelProcessing().process(exchange2 -> {
            EventMessage eventMessage = (EventMessage) exchange2.getIn().getBody(EventMessage.class);
            exchange2.setProperty("eventInfo", EventsInfo.Event.builder().topicName(eventMessage.getOriginalTopic()).eventName(eventMessage.getRegion()).eventMappings(Arrays.asList((EventsInfo.EventMapping) JsonUtil.writeValueAsObject(eventMessage.getOriginalMapping(), EventsInfo.EventMapping.class))).build());
        }).to("publish://send-normal").end().bean(ErrorMessageRepository.class, "findErrorMessageByOriginalExists()").split(new SplitList()).parallelProcessing().process(exchange3 -> {
            ErrorMessage errorMessage = (ErrorMessage) exchange3.getIn().getBody(ErrorMessage.class);
            exchange3.setProperty("eventInfo", EventsInfo.Event.builder().topicName(errorMessage.getOriginalTopic()).eventName(errorMessage.getEvent()).eventMappings(Arrays.asList((EventsInfo.EventMapping) JsonUtil.writeValueAsObject(errorMessage.getOriginalMapping(), EventsInfo.EventMapping.class))).build());
        }).to("publish://send-error").end().end();
        from("eventstore://clear").routeId("eventstore://clear").transacted().bean(EventsInfo.class, "getAllEvents").split(new SplitList()).parallelProcessing().setHeader("eventInfo", body()).bean(EventMessageRepository.class, "findSentIdByRegionLimitCount(${body.getEventName()},1000)").to("clean://idempotent-consumed-filter").choice().when(simple("${body.size} > 0")).bean(EventMessageRepository.class, "deleteAllByIdInBatch").endChoice().end().end().bean(EventMessageRepository.class, "findSentIdByOriginalExists()").split(new SplitList()).parallelProcessing().process(exchange4 -> {
            EventMessage eventMessage = (EventMessage) exchange4.getIn().getBody(EventMessage.class);
            exchange4.getIn().setHeader("eventInfo", EventsInfo.Event.builder().topicName(eventMessage.getOriginalTopic()).eventName(eventMessage.getRegion()).eventMappings(Arrays.asList((EventsInfo.EventMapping) JsonUtil.writeValueAsObject(eventMessage.getOriginalMapping(), EventsInfo.EventMapping.class))).build());
        }).to("clean://idempotent-consumed-filter").choice().when(simple("${body.size} > 0")).bean(EventMessageRepository.class, "deleteAllByIdInBatch").endChoice().end().end().bean(IdempotentMessageRepository.class, "findSender").split(new SplitList()).parallelProcessing().setHeader("senderAddress", body()).bean(IdempotentMessageRepository.class, "findMessageIdsBySenderLimitCount(${body},1000)").to("clean://store-removed-filter").choice().when(simple("${body.size} > 0")).bean(IdempotentMessageRepository.class, "deleteAllBySequenceIn").endChoice().end().end();
    }
}
