package org.springframework.cloud.fn.consumer.elasticsearch;

import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.InlineGet;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import java.io.IOException;
import java.io.StringReader;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchClientAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor;
import org.springframework.integration.aggregator.MessageCountReleaseStrategy;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.gateway.AnnotationGatewayProxyFactoryBean;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

@EnableConfigurationProperties({ElasticsearchConsumerProperties.class})
@AutoConfiguration(after = {ElasticsearchClientAutoConfiguration.class})
/* loaded from: input_file:org/springframework/cloud/fn/consumer/elasticsearch/ElasticsearchConsumerConfiguration.class */
public class ElasticsearchConsumerConfiguration {
    private static final Log LOGGER = LogFactory.getLog(ElasticsearchConsumerConfiguration.class);
    public static final String INDEX_ID_HEADER = "INDEX_ID";
    public static final String INDEX_NAME_HEADER = "INDEX_NAME";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/cloud/fn/consumer/elasticsearch/ElasticsearchConsumerConfiguration$MessageWrapper.class */
    public static final class MessageWrapper extends Record {
        private final Message<?> message;

        MessageWrapper(Message<?> message) {
            this.message = message;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, MessageWrapper.class), MessageWrapper.class, "message", "FIELD:Lorg/springframework/cloud/fn/consumer/elasticsearch/ElasticsearchConsumerConfiguration$MessageWrapper;->message:Lorg/springframework/messaging/Message;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, MessageWrapper.class), MessageWrapper.class, "message", "FIELD:Lorg/springframework/cloud/fn/consumer/elasticsearch/ElasticsearchConsumerConfiguration$MessageWrapper;->message:Lorg/springframework/messaging/Message;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, MessageWrapper.class, Object.class), MessageWrapper.class, "message", "FIELD:Lorg/springframework/cloud/fn/consumer/elasticsearch/ElasticsearchConsumerConfiguration$MessageWrapper;->message:Lorg/springframework/messaging/Message;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Message<?> message() {
            return this.message;
        }
    }

    @Bean
    FactoryBean<MessageHandler> elasticsearchAggregator(ElasticsearchConsumerProperties elasticsearchConsumerProperties) {
        AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
        aggregatorFactoryBean.setCorrelationStrategy(message -> {
            return "";
        });
        aggregatorFactoryBean.setReleaseStrategy(new MessageCountReleaseStrategy(elasticsearchConsumerProperties.getBatchSize()));
        if (elasticsearchConsumerProperties.getGroupTimeout() >= 0) {
            aggregatorFactoryBean.setGroupTimeoutExpression(new ValueExpression(Long.valueOf(elasticsearchConsumerProperties.getGroupTimeout())));
        }
        SimpleMessageStore simpleMessageStore = new SimpleMessageStore();
        simpleMessageStore.setTimeoutOnIdle(true);
        simpleMessageStore.setCopyOnGet(false);
        aggregatorFactoryBean.setMessageStore(simpleMessageStore);
        aggregatorFactoryBean.setProcessorBean(new AbstractAggregatingMessageGroupProcessor() { // from class: org.springframework.cloud.fn.consumer.elasticsearch.ElasticsearchConsumerConfiguration.1
            protected Object aggregatePayloads(MessageGroup messageGroup, Map<String, Object> map) {
                Collection messages = messageGroup.getMessages();
                Assert.notEmpty(messages, getClass().getSimpleName() + " cannot process empty message groups");
                ArrayList arrayList = new ArrayList(messages.size());
                Iterator it = messages.iterator();
                while (it.hasNext()) {
                    arrayList.add(new MessageWrapper((Message) it.next()));
                }
                return arrayList;
            }
        });
        aggregatorFactoryBean.setExpireGroupsUponCompletion(true);
        aggregatorFactoryBean.setSendPartialResultOnExpiry(true);
        return aggregatorFactoryBean;
    }

    @Bean
    IntegrationFlow elasticsearchConsumerFlow(@Qualifier("elasticsearchAggregator") MessageHandler messageHandler, ElasticsearchConsumerProperties elasticsearchConsumerProperties, @Qualifier("elasticsearchIndexingHandler") MessageHandler messageHandler2) {
        return integrationFlowDefinition -> {
            if (elasticsearchConsumerProperties.getBatchSize() > 1) {
                integrationFlowDefinition.handle(messageHandler);
            }
            integrationFlowDefinition.handle(messageHandler2);
        };
    }

    @Bean
    AnnotationGatewayProxyFactoryBean<Consumer<Message<?>>> elasticsearchConsumer() {
        AnnotationGatewayProxyFactoryBean<Consumer<Message<?>>> annotationGatewayProxyFactoryBean = new AnnotationGatewayProxyFactoryBean<>(Consumer.class);
        annotationGatewayProxyFactoryBean.setDefaultRequestChannelName("elasticsearchConsumerFlow.input");
        return annotationGatewayProxyFactoryBean;
    }

    @Bean
    public MessageHandler elasticsearchIndexingHandler(ElasticsearchClient elasticsearchClient, ElasticsearchConsumerProperties elasticsearchConsumerProperties) {
        return message -> {
            Object payload = message.getPayload();
            if (!(payload instanceof Iterable)) {
                index(elasticsearchClient, (IndexRequest<?>) buildIndexRequest(message, elasticsearchConsumerProperties), elasticsearchConsumerProperties.isAsync());
                return;
            }
            Iterable iterable = (Iterable) payload;
            BulkRequest.Builder builder = new BulkRequest.Builder();
            Stream stream = StreamSupport.stream(iterable.spliterator(), false);
            Class<MessageWrapper> cls = MessageWrapper.class;
            Objects.requireNonNull(MessageWrapper.class);
            stream.filter(cls::isInstance).map(obj -> {
                return ((MessageWrapper) obj).message();
            }).map(message -> {
                return buildIndexRequest(message, elasticsearchConsumerProperties);
            }).forEach(indexRequest -> {
                builder.operations(builder2 -> {
                    return builder2.index(builder2 -> {
                        return builder2.index(indexRequest.index()).id(indexRequest.id()).document(indexRequest.document());
                    });
                });
            });
            index(elasticsearchClient, builder.build(), elasticsearchConsumerProperties.isAsync());
        };
    }

    private IndexRequest<Object> buildIndexRequest(Message<?> message, ElasticsearchConsumerProperties elasticsearchConsumerProperties) {
        IndexRequest.Builder builder = new IndexRequest.Builder();
        String index = elasticsearchConsumerProperties.getIndex();
        if (message.getHeaders().containsKey(INDEX_NAME_HEADER)) {
            index = (String) message.getHeaders().get(INDEX_NAME_HEADER);
        }
        builder.index(index);
        String str = "";
        if (message.getHeaders().containsKey(INDEX_ID_HEADER)) {
            str = (String) message.getHeaders().get(INDEX_ID_HEADER);
        } else if (elasticsearchConsumerProperties.getId() != null) {
            str = (String) elasticsearchConsumerProperties.getId().getValue(message, String.class);
        }
        builder.id(str);
        if (message.getPayload() instanceof String) {
            builder.withJson(new StringReader((String) message.getPayload()));
        } else if (message.getPayload() instanceof Map) {
            builder.document(message.getPayload());
        }
        if (StringUtils.hasText(elasticsearchConsumerProperties.getRouting())) {
            builder.routing(elasticsearchConsumerProperties.getRouting());
        }
        if (elasticsearchConsumerProperties.getTimeoutSeconds() > 0) {
            builder.timeout((Time) new Time.Builder().time(elasticsearchConsumerProperties.getTimeoutSeconds() + "s").build());
        }
        return builder.build();
    }

    private void index(ElasticsearchClient elasticsearchClient, BulkRequest bulkRequest, boolean z) {
        if (z) {
            new ElasticsearchAsyncClient(elasticsearchClient._transport()).bulk(bulkRequest).whenComplete((bulkResponse, th) -> {
                if (th != null) {
                    throw new IllegalStateException("Error occurred while performing bulk index operation: " + th.getMessage(), th);
                }
                handleBulkResponse(bulkResponse);
            });
            return;
        }
        try {
            handleBulkResponse(elasticsearchClient.bulk(bulkRequest));
        } catch (IOException e) {
            throw new IllegalStateException("Error occurred while performing bulk index operation: " + e.getMessage(), e);
        }
    }

    private void index(ElasticsearchClient elasticsearchClient, IndexRequest<?> indexRequest, boolean z) {
        if (z) {
            new ElasticsearchAsyncClient(elasticsearchClient._transport()).index(indexRequest).whenComplete((indexResponse, th) -> {
                if (th != null) {
                    throw new IllegalStateException("Error occurred while indexing document: " + th.getMessage(), th);
                }
                handleResponse(indexResponse);
            });
            return;
        }
        try {
            handleResponse(elasticsearchClient.index(indexRequest));
        } catch (IOException e) {
            throw new IllegalStateException("Error occurred while indexing document: " + e.getMessage(), e);
        }
    }

    private void handleBulkResponse(BulkResponse bulkResponse) {
        if (LOGGER.isDebugEnabled() || bulkResponse.errors()) {
            for (BulkResponseItem bulkResponseItem : bulkResponse.items()) {
                if (bulkResponseItem.error() != null) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("itemResponse.error=" + bulkResponseItem.error());
                    }
                    LOGGER.error(String.format("Index operation [id=%s, index=%s] failed: %s", bulkResponseItem.id(), bulkResponseItem.index(), bulkResponseItem.error().toString()));
                } else {
                    InlineGet inlineGet = bulkResponseItem.get();
                    if (inlineGet != null) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("itemResponse:" + inlineGet);
                        }
                        LOGGER.debug(String.format("Index operation [id=%s, index=%s] succeeded: document [id=%s, version=%s] was written on shard %s.", bulkResponseItem.id(), bulkResponseItem.index(), ((Map) inlineGet.source()).get("id"), ((Map) inlineGet.source()).get("version"), ((Map) inlineGet.source()).get("shardId")));
                    } else {
                        LOGGER.debug(String.format("Index operation [id=%s, index=%s] succeeded", bulkResponseItem.id(), bulkResponseItem.index()));
                    }
                }
            }
        }
        if (bulkResponse.errors()) {
            Optional reduce = bulkResponse.items().stream().map(bulkResponseItem2 -> {
                return bulkResponseItem2.error() != null ? bulkResponseItem2.error().toString() : "";
            }).reduce((str, str2) -> {
                return str + " : " + str2;
            });
            Objects.requireNonNull(bulkResponse);
            throw new IllegalStateException("Bulk indexing operation completed with failures: " + ((String) reduce.orElseGet(bulkResponse::toString)));
        }
    }

    private void handleResponse(IndexResponse indexResponse) {
        LOGGER.debug(String.format("Index operation [index=%s] succeeded: document [id=%s, version=%d] was written on shard %s.", indexResponse.index(), indexResponse.id(), Long.valueOf(indexResponse.version()), indexResponse.shards().toString()));
    }
}
