package com.netflix.conductor.contribs.queue.nats;

import com.netflix.conductor.contribs.queue.nats.config.JetStreamProperties;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.NUID;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.RetentionPolicy;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.availability.AvailabilityChangeEvent;
import org.springframework.boot.availability.LivenessState;
import org.springframework.context.ApplicationEventPublisher;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Action1;

/* loaded from: input_file:com/netflix/conductor/contribs/queue/nats/JetStreamObservableQueue.class */
public class JetStreamObservableQueue implements ObservableQueue {
    private static final Logger LOG = LoggerFactory.getLogger(JetStreamObservableQueue.class);
    private final String queueType;
    private final String subject;
    private final String queueUri;
    private final JetStreamProperties properties;
    private final Scheduler scheduler;
    private final ApplicationEventPublisher eventPublisher;
    private Connection nc;
    private JetStreamSubscription sub;
    private Observable<Long> interval;
    private final String queueGroup;
    private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
    private final Lock mu = new ReentrantLock();
    private final AtomicBoolean running = new AtomicBoolean(false);

    public JetStreamObservableQueue(ConductorProperties conductorProperties, JetStreamProperties jetStreamProperties, String str, String str2, Scheduler scheduler, ApplicationEventPublisher applicationEventPublisher) {
        LOG.debug("JSM obs queue create, qtype={}, quri={}", str, str2);
        this.queueUri = str2;
        if (str2.contains(":")) {
            this.subject = getQueuePrefix(conductorProperties, jetStreamProperties) + str2.substring(0, str2.indexOf(58));
            this.queueGroup = str2.substring(str2.indexOf(58) + 1);
        } else {
            this.subject = getQueuePrefix(conductorProperties, jetStreamProperties) + str2;
            this.queueGroup = null;
        }
        this.queueType = str;
        this.properties = jetStreamProperties;
        this.scheduler = scheduler;
        this.eventPublisher = applicationEventPublisher;
    }

    public static String getQueuePrefix(ConductorProperties conductorProperties, JetStreamProperties jetStreamProperties) {
        String str = "";
        if (conductorProperties.getStack() != null && conductorProperties.getStack().length() > 0) {
            str = conductorProperties.getStack() + "_";
        }
        return StringUtils.isBlank(jetStreamProperties.getListenerQueuePrefix()) ? conductorProperties.getAppId() + "_jsm_notify_" + str : jetStreamProperties.getListenerQueuePrefix();
    }

    public Observable<Message> observe() {
        return Observable.create(getOnSubscribe());
    }

    private Observable.OnSubscribe<Message> getOnSubscribe() {
        return subscriber -> {
            this.interval = Observable.interval(this.properties.getPollTimeDuration().toMillis(), TimeUnit.MILLISECONDS, this.scheduler);
            Observable flatMap = this.interval.flatMap(l -> {
                if (!isRunning()) {
                    LOG.debug("Component stopped, skip listening for messages from JSM Queue '{}'", this.subject);
                    return Observable.from(Collections.emptyList());
                }
                ArrayList arrayList = new ArrayList();
                this.messages.drainTo(arrayList);
                if (!arrayList.isEmpty()) {
                    LOG.debug("Processing JSM queue '{}' batch messages count={}", this.subject, Integer.valueOf(arrayList.size()));
                }
                return Observable.from(arrayList);
            });
            Objects.requireNonNull(subscriber);
            Action1 action1 = (v1) -> {
                r1.onNext(v1);
            };
            Objects.requireNonNull(subscriber);
            flatMap.subscribe(action1, subscriber::onError);
        };
    }

    public String getType() {
        return this.queueType;
    }

    public String getName() {
        return this.queueUri;
    }

    public String getURI() {
        return getName();
    }

    public List<String> ack(List<Message> list) {
        list.forEach(message -> {
            ((JsmMessage) message).getJsmMsg().ack();
        });
        return Collections.emptyList();
    }

    public void publish(List<Message> list) {
        try {
            Connection connect = Nats.connect(this.properties.getUrl());
            try {
                JetStream jetStream = connect.jetStream();
                Iterator<Message> it = list.iterator();
                while (it.hasNext()) {
                    jetStream.publish(this.subject, it.next().getPayload().getBytes());
                }
                if (connect != null) {
                    connect.close();
                }
            } finally {
            }
        } catch (IOException | JetStreamApiException e) {
            throw new NatsException("Failed to publish to jsm", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new NatsException("Failed to publish to jsm", e2);
        }
    }

    public void setUnackTimeout(Message message, long j) {
    }

    public long size() {
        try {
            return this.sub.getConsumerInfo().getNumPending();
        } catch (IOException | JetStreamApiException e) {
            LOG.warn("Failed to get stream '{}' info", this.subject);
            return 0L;
        }
    }

    public void start() {
        this.mu.lock();
        try {
            natsConnect();
        } finally {
            this.mu.unlock();
        }
    }

    public void stop() {
        this.interval.unsubscribeOn(this.scheduler);
        try {
            if (this.nc != null) {
                this.nc.close();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error("Failed to close Nats connection", e);
        }
        this.running.set(false);
    }

    public boolean isRunning() {
        return this.running.get();
    }

    private void natsConnect() {
        if (this.running.get()) {
            return;
        }
        LOG.info("Starting JSM observable, name={}", this.queueUri);
        try {
            Nats.connectAsynchronously(new Options.Builder().connectionListener((connection, events) -> {
                LOG.info("Connection to JSM updated: {}", events);
                if (ConnectionListener.Events.CLOSED.equals(events)) {
                    LOG.error("Could not reconnect to NATS! Changing liveness status to {}!", LivenessState.BROKEN);
                    AvailabilityChangeEvent.publish(this.eventPublisher, events, LivenessState.BROKEN);
                }
                this.nc = connection;
                subscribeOnce(connection, events);
            }).errorListener(new LoggingNatsErrorListener()).server(this.properties.getUrl()).maxReconnects(this.properties.getMaxReconnects()).build(), true);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new NatsException("Failed to connect to JSM", e);
        }
    }

    private void createStream(JetStreamManagement jetStreamManagement) {
        StreamConfiguration build = StreamConfiguration.builder().name(this.subject).replicas(this.properties.getReplicas()).retentionPolicy(RetentionPolicy.Limits).maxBytes(this.properties.getStreamMaxBytes()).storageType(StorageType.get(this.properties.getStreamStorageType())).build();
        try {
            LOG.debug("Updated stream, info: {}", jetStreamManagement.addStream(build));
        } catch (IOException | JetStreamApiException e) {
            LOG.error("Failed to add stream: " + build, e);
            AvailabilityChangeEvent.publish(this.eventPublisher, e, LivenessState.BROKEN);
        }
    }

    private void subscribeOnce(Connection connection, ConnectionListener.Events events) {
        if (events.equals(ConnectionListener.Events.CONNECTED) || events.equals(ConnectionListener.Events.RECONNECTED)) {
            try {
                JetStreamManagement jetStreamManagement = connection.jetStreamManagement();
                createStream(jetStreamManagement);
                subscribe(connection, createConsumer(jetStreamManagement));
            } catch (IOException e) {
                throw new NatsException("Failed to get jsm management", e);
            }
        }
    }

    private ConsumerConfiguration createConsumer(JetStreamManagement jetStreamManagement) {
        ConsumerConfiguration build = ConsumerConfiguration.builder().name(this.properties.getDurableName()).deliverGroup(this.queueGroup).durable(this.properties.getDurableName()).ackWait(this.properties.getAckWait()).maxDeliver(this.properties.getMaxDeliver()).maxAckPending(this.properties.getMaxAckPending()).ackPolicy(AckPolicy.Explicit).deliverSubject(this.subject + "-deliver").deliverPolicy(DeliverPolicy.New).build();
        try {
            jetStreamManagement.addOrUpdateConsumer(this.subject, build);
            return build;
        } catch (IOException | JetStreamApiException e) {
            throw new NatsException("Failed to add/update consumer", e);
        }
    }

    private void subscribe(Connection connection, ConsumerConfiguration consumerConfiguration) {
        try {
            JetStream jetStream = connection.jetStream();
            PushSubscribeOptions build = ((PushSubscribeOptions.Builder) ((PushSubscribeOptions.Builder) ((PushSubscribeOptions.Builder) PushSubscribeOptions.builder().configuration(consumerConfiguration)).stream(this.subject)).bind(true)).build();
            LOG.debug("Subscribing jsm, subject={}, options={}", this.subject, build);
            this.sub = jetStream.subscribe(this.subject, this.queueGroup, connection.createDispatcher(), message -> {
                JsmMessage jsmMessage = new JsmMessage();
                jsmMessage.setJsmMsg(message);
                jsmMessage.setId(NUID.nextGlobal());
                jsmMessage.setPayload(new String(message.getData()));
                this.messages.add(jsmMessage);
            }, false, build);
            LOG.debug("Subscribed successfully {}", this.sub.getConsumerInfo());
            this.running.set(true);
        } catch (IOException | JetStreamApiException e) {
            throw new NatsException("Failed to subscribe", e);
        }
    }
}
