package com.netflix.conductor.contribs.queue.nats;

import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import io.nats.client.NUID;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Action1;

/* loaded from: input_file:com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.class */
public abstract class NATSAbstractQueue implements ObservableQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(NATSAbstractQueue.class);
    protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
    protected final Lock mu = new ReentrantLock();
    private final String queueType;
    private ScheduledExecutorService execs;
    private final Scheduler scheduler;
    protected final String queueURI;
    protected final String subject;
    protected String queue;
    private boolean observable;
    private boolean isOpened;
    private volatile boolean running;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NATSAbstractQueue(String str, String str2, Scheduler scheduler) {
        this.queueURI = str;
        this.queueType = str2;
        this.scheduler = scheduler;
        if (str.contains(":")) {
            this.subject = str.substring(0, str.indexOf(58));
            this.queue = str.substring(str.indexOf(58) + 1);
        } else {
            this.subject = str;
            this.queue = null;
        }
        LOGGER.info(String.format("Initialized with queueURI=%s, subject=%s, queue=%s", str, this.subject, this.queue));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onMessage(String str, byte[] bArr) {
        String str2 = new String(bArr);
        LOGGER.info(String.format("Received message for %s: %s", str, str2));
        Message message = new Message();
        message.setId(NUID.nextGlobal());
        message.setPayload(str2);
        this.messages.add(message);
    }

    public Observable<Message> observe() {
        LOGGER.info("Observe invoked for queueURI " + this.queueURI);
        this.observable = true;
        this.mu.lock();
        try {
            subscribe();
            return Observable.create(subscriber -> {
                Observable flatMap = Observable.interval(100L, TimeUnit.MILLISECONDS, this.scheduler).flatMap(l -> {
                    if (!isRunning()) {
                        LOGGER.debug("Component stopped, skip listening for messages from NATS Queue");
                        return Observable.from(Collections.emptyList());
                    }
                    LinkedList linkedList = new LinkedList();
                    this.messages.drainTo(linkedList);
                    if (!linkedList.isEmpty()) {
                        AtomicInteger atomicInteger = new AtomicInteger(0);
                        StringBuilder sb = new StringBuilder();
                        linkedList.forEach(message -> {
                            sb.append(message.getId()).append("=").append(message.getPayload());
                            atomicInteger.incrementAndGet();
                            if (atomicInteger.get() < linkedList.size()) {
                                sb.append(",");
                            }
                        });
                        LOGGER.info(String.format("Batch from %s to conductor is %s", this.subject, sb.toString()));
                    }
                    return Observable.from(linkedList);
                });
                Objects.requireNonNull(subscriber);
                Action1 action1 = (v1) -> {
                    r1.onNext(v1);
                };
                Objects.requireNonNull(subscriber);
                flatMap.subscribe(action1, subscriber::onError);
            });
        } finally {
            this.mu.unlock();
        }
    }

    public String getType() {
        return this.queueType;
    }

    public String getName() {
        return this.queueURI;
    }

    public String getURI() {
        return this.queueURI;
    }

    public List<String> ack(List<Message> list) {
        return Collections.emptyList();
    }

    public void setUnackTimeout(Message message, long j) {
    }

    public long size() {
        return this.messages.size();
    }

    public void publish(List<Message> list) {
        list.forEach(message -> {
            try {
                String payload = message.getPayload();
                publish(this.subject, payload.getBytes());
                LOGGER.info(String.format("Published message to %s: %s", this.subject, payload));
            } catch (Exception e) {
                LOGGER.error("Failed to publish message " + message.getPayload() + " to " + this.subject, e);
                throw new RuntimeException(e);
            }
        });
    }

    public boolean rePublishIfNoAck() {
        return true;
    }

    public void close() {
        LOGGER.info("Closing connection for " + this.queueURI);
        this.mu.lock();
        try {
            if (this.execs != null) {
                this.execs.shutdownNow();
                this.execs = null;
            }
            closeSubs();
            closeConn();
            this.isOpened = false;
        } finally {
            this.mu.unlock();
        }
    }

    public void open() {
        if (this.isOpened) {
            return;
        }
        this.mu.lock();
        try {
            try {
                connect();
                if (this.observable) {
                    subscribe();
                }
            } finally {
                this.mu.unlock();
            }
        } catch (Exception e) {
        }
        this.execs = Executors.newScheduledThreadPool(1);
        this.execs.scheduleAtFixedRate(this::monitor, 0L, 500L, TimeUnit.MILLISECONDS);
        this.isOpened = true;
    }

    private void monitor() {
        if (isConnected()) {
            return;
        }
        LOGGER.error("Monitor invoked for " + this.queueURI);
        this.mu.lock();
        try {
            closeSubs();
            closeConn();
            connect();
            if (this.observable) {
                subscribe();
            }
        } catch (Exception e) {
            LOGGER.error("Monitor failed with " + e.getMessage() + " for " + this.queueURI, e);
        } finally {
            this.mu.unlock();
        }
    }

    public boolean isClosed() {
        return !this.isOpened;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ensureConnected() {
        if (!isConnected()) {
            throw new RuntimeException("No nats connection");
        }
    }

    public void start() {
        LOGGER.info("Started listening to {}:{}", getClass().getSimpleName(), this.queueURI);
        this.running = true;
    }

    public void stop() {
        LOGGER.info("Stopped listening to {}:{}", getClass().getSimpleName(), this.queueURI);
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    abstract void connect();

    abstract boolean isConnected();

    abstract void publish(String str, byte[] bArr) throws Exception;

    abstract void subscribe();

    abstract void closeSubs();

    abstract void closeConn();
}
