package org.entur.pubsub.camel;

import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.entur.pubsub.base.EnturGooglePubSubUtils;
import org.entur.pubsub.camel.EnturGooglePubSubConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/entur/pubsub/camel/EnturGooglePubSubConsumer.class */
public class EnturGooglePubSubConsumer extends DefaultConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(EnturGooglePubSubConsumer.class);
    private final EnturGooglePubSubEndpoint endpoint;
    private final Processor processor;
    private final Synchronization ackStrategy;
    private final PubSubTemplate pubSubTemplate;
    private List<Subscriber> subscribers;

    public EnturGooglePubSubConsumer(EnturGooglePubSubEndpoint enturGooglePubSubEndpoint, Processor processor, PubSubTemplate pubSubTemplate) {
        super(enturGooglePubSubEndpoint, processor);
        this.subscribers = new ArrayList();
        this.endpoint = enturGooglePubSubEndpoint;
        this.processor = processor;
        this.ackStrategy = new EnturExchangeAckTransaction();
        this.pubSubTemplate = pubSubTemplate;
    }

    protected void doStart() throws Exception {
        super.doStart();
        LOGGER.info("Starting Google PubSub consumer for {}/{}", this.endpoint.getProjectId(), this.endpoint.getDestinationName());
        Consumer<BasicAcknowledgeablePubsubMessage> consumer = new Consumer<BasicAcknowledgeablePubsubMessage>() { // from class: org.entur.pubsub.camel.EnturGooglePubSubConsumer.1
            @Override // java.util.function.Consumer
            public void accept(BasicAcknowledgeablePubsubMessage basicAcknowledgeablePubsubMessage) {
                PubsubMessage pubsubMessage = basicAcknowledgeablePubsubMessage.getPubsubMessage();
                byte[] byteArray = pubsubMessage.getData().toByteArray();
                if (EnturGooglePubSubConsumer.LOGGER.isTraceEnabled()) {
                    EnturGooglePubSubConsumer.LOGGER.trace("Received message ID : {}", pubsubMessage.getMessageId());
                }
                Exchange createExchange = EnturGooglePubSubConsumer.this.endpoint.createExchange();
                createExchange.getIn().setBody(byteArray);
                createExchange.getIn().setHeader(EnturGooglePubSubConstants.ACK_ID, basicAcknowledgeablePubsubMessage);
                createExchange.getIn().setHeader(EnturGooglePubSubConstants.MESSAGE_ID, pubsubMessage.getMessageId());
                createExchange.getIn().setHeader(EnturGooglePubSubConstants.PUBLISH_TIME, pubsubMessage.getPublishTime());
                Map attributesMap = pubsubMessage.getAttributesMap();
                if (attributesMap != null) {
                    attributesMap.entrySet().stream().filter(entry -> {
                        return !((String) entry.getKey()).startsWith(EnturGooglePubSubConstants.GOOGLE_PUB_SUB_HEADER_PREFIX);
                    }).forEach(entry2 -> {
                        createExchange.getIn().setHeader((String) entry2.getKey(), entry2.getValue());
                    });
                }
                if (EnturGooglePubSubConsumer.this.endpoint.getAckMode() != EnturGooglePubSubConstants.AckMode.NONE) {
                    createExchange.adapt(ExtendedExchange.class).addOnCompletion(EnturGooglePubSubConsumer.this.ackStrategy);
                }
                try {
                    EnturGooglePubSubConsumer.this.processor.process(createExchange);
                } catch (Exception e) {
                    createExchange.setException(e);
                }
            }
        };
        for (int i = 0; i < this.endpoint.getConcurrentConsumers().intValue(); i++) {
            this.subscribers.add(this.pubSubTemplate.subscribe(this.endpoint.getDestinationName(), consumer));
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        LOGGER.info("Stopping Google PubSub consumer for subscription {}/{}", this.endpoint.getProjectId(), this.endpoint.getDestinationName());
        Iterator<Subscriber> it = this.subscribers.iterator();
        while (it.hasNext()) {
            EnturGooglePubSubUtils.closeSubscriber(it.next());
        }
        LOGGER.info("Stopped Google PubSub consumer for subscription {}/{}", this.endpoint.getProjectId(), this.endpoint.getDestinationName());
    }
}
