package org.springframework.cloud.stream.binder.nats;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Subscription;
import java.time.Duration;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.Lifecycle;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.messaging.support.GenericMessage;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-binder-nats-0.3.1.jar:org/springframework/cloud/stream/binder/nats/NatsMessageSource.class */
public class NatsMessageSource extends AbstractMessageSource<Object> implements Lifecycle {
    private static final Log logger = LogFactory.getLog((Class<?>) NatsMessageHandler.class);
    private NatsConsumerDestination destination;
    private Connection connection;
    private Subscription sub;

    public NatsMessageSource(NatsConsumerDestination natsConsumerDestination, Connection connection) {
        this.destination = natsConsumerDestination;
        this.connection = connection;
    }

    @Override // org.springframework.integration.endpoint.AbstractMessageSource
    protected Object doReceive() {
        if (this.sub == null) {
            return null;
        }
        try {
            Message nextMessage = this.sub.nextMessage(Duration.ZERO);
            if (nextMessage == null) {
                return null;
            }
            HashMap hashMap = new HashMap();
            hashMap.put(NatsMessageProducer.SUBJECT, nextMessage.getSubject());
            hashMap.put(NatsMessageProducer.REPLY_TO, nextMessage.getReplyTo());
            return new GenericMessage(nextMessage.getData(), hashMap);
        } catch (InterruptedException e) {
            logger.info("wait for message interrupted");
            return null;
        }
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.sub != null;
    }

    @Override // org.springframework.context.Lifecycle
    public void start() {
        if (this.sub != null) {
            return;
        }
        String subject = this.destination.getSubject();
        String queueGroup = this.destination.getQueueGroup();
        if (queueGroup == null || queueGroup.length() <= 0) {
            this.sub = this.connection.subscribe(subject);
        } else {
            this.sub = this.connection.subscribe(subject, queueGroup);
        }
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        if (this.sub == null) {
            return;
        }
        this.sub.unsubscribe();
        this.sub = null;
    }

    @Override // org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "nats:message-source";
    }
}
