package io.synadia.jnats.extension;

import io.nats.client.JetStream;
import io.nats.client.Message;
import io.nats.client.NUID;
import io.nats.client.PublishOptions;
import io.nats.client.impl.Headers;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/synadia/jnats/extension/ManagedAsyncJsPublisher.class */
public class ManagedAsyncJsPublisher implements AutoCloseable {
    public static final int DEFAULT_MAX_IN_FLIGHT = 50;
    public static final int DEFAULT_REFILL_AMOUNT = 0;
    public static final long DEFAULT_POLL_TIME = 100;
    public static final long DEFAULT_PAUSE_TIME = 100;
    public static final long DEFAULT_WAIT_TIMEOUT = 5000;
    private static final PreFlight DRAIN_MARKER = new PreFlight("DRAIN_MARKER", null, null, null, null);
    private final AtomicLong messageIdGenerator;
    private final JetStream js;
    private final String idPrefix;
    private final int maxInFlight;
    private final int refillAllowedAt;
    private final RetryConfig retryConfig;
    private final PublisherListener publisherListener;
    private final long pollTime;
    private final long holdPauseTime;
    private final long waitTimeout;
    private final LinkedBlockingQueue<PreFlight> preFlight;
    private final LinkedBlockingQueue<Flight> inFlight;
    private final AtomicBoolean notHolding;
    private final AtomicBoolean keepGoingPublishRunner;
    private final AtomicBoolean keepGoingFlightsRunner;
    private final AtomicBoolean draining;
    private final ExecutorService notificationExecutorService;
    private final boolean notificationExecutorServiceWasNotSupplied;
    private final AtomicReference<Thread> publishRunnerThread;
    private final AtomicReference<Thread> flightsRunnerThread;
    private final CountDownLatch publishRunnerDone;
    private final CountDownLatch flightsRunnerDone;

    /* loaded from: input_file:io/synadia/jnats/extension/ManagedAsyncJsPublisher$Builder.class */
    public static class Builder {
        JetStream js;
        RetryConfig retryConfig;
        PublisherListener publisherListener;
        ExecutorService notificationExecutorService;
        String idPrefix = NUID.nextGlobal();
        int maxInFlight = 50;
        int refillAllowedAt = 0;
        long pollTime = 100;
        long holdPauseTime = 100;
        long waitTimeout = ManagedAsyncJsPublisher.DEFAULT_WAIT_TIMEOUT;

        public Builder(JetStream jetStream) {
            if (jetStream == null) {
                throw new IllegalArgumentException("JetStream context is required.");
            }
            this.js = jetStream;
        }

        public Builder idPrefix(String str) {
            this.idPrefix = str;
            return this;
        }

        public Builder maxInFlight(int i) {
            this.maxInFlight = i;
            return this;
        }

        public Builder refillAllowedAt(int i) {
            this.refillAllowedAt = i;
            return this;
        }

        public Builder retryConfig(RetryConfig retryConfig) {
            this.retryConfig = retryConfig;
            return this;
        }

        public Builder publisherListener(PublisherListener publisherListener) {
            this.publisherListener = publisherListener;
            return this;
        }

        public Builder pollTime(long j) {
            this.pollTime = j;
            return this;
        }

        public Builder holdPauseTime(long j) {
            this.holdPauseTime = j;
            return this;
        }

        public Builder waitTimeout(long j) {
            this.waitTimeout = j;
            return this;
        }

        public Builder notificationExecutorService(ExecutorService executorService) {
            this.notificationExecutorService = executorService;
            return this;
        }

        public ManagedAsyncJsPublisher build() {
            return new ManagedAsyncJsPublisher(this);
        }

        public ManagedAsyncJsPublisher start() {
            ManagedAsyncJsPublisher managedAsyncJsPublisher = new ManagedAsyncJsPublisher(this);
            managedAsyncJsPublisher.start();
            return managedAsyncJsPublisher;
        }
    }

    private ManagedAsyncJsPublisher(Builder builder) {
        this.messageIdGenerator = new AtomicLong(0L);
        this.js = builder.js;
        this.idPrefix = builder.idPrefix;
        this.maxInFlight = builder.maxInFlight;
        this.refillAllowedAt = builder.refillAllowedAt;
        this.retryConfig = builder.retryConfig;
        this.publisherListener = builder.publisherListener;
        this.pollTime = builder.pollTime;
        this.holdPauseTime = builder.holdPauseTime;
        this.waitTimeout = builder.waitTimeout;
        if (builder.notificationExecutorService == null) {
            this.notificationExecutorService = Executors.newFixedThreadPool(1);
            this.notificationExecutorServiceWasNotSupplied = true;
        } else {
            this.notificationExecutorService = builder.notificationExecutorService;
            this.notificationExecutorServiceWasNotSupplied = false;
        }
        this.preFlight = new LinkedBlockingQueue<>();
        this.inFlight = new LinkedBlockingQueue<>();
        this.notHolding = new AtomicBoolean(true);
        this.keepGoingPublishRunner = new AtomicBoolean(true);
        this.keepGoingFlightsRunner = new AtomicBoolean(true);
        this.draining = new AtomicBoolean(false);
        this.publishRunnerThread = new AtomicReference<>();
        this.flightsRunnerThread = new AtomicReference<>();
        this.publishRunnerDone = new CountDownLatch(1);
        this.flightsRunnerDone = new CountDownLatch(1);
    }

    public void start() {
        Thread thread = new Thread(this::publishRunner);
        thread.start();
        this.publishRunnerThread.set(thread);
        Thread thread2 = new Thread(this::flightsRunner);
        thread2.start();
        this.flightsRunnerThread.set(thread2);
    }

    public Thread getPublishRunnerThread() {
        return this.publishRunnerThread.get();
    }

    public Thread getFlightsRunnerThread() {
        return this.flightsRunnerThread.get();
    }

    public void stop() {
        this.keepGoingPublishRunner.set(false);
        this.keepGoingFlightsRunner.set(false);
    }

    public void drain() {
        this.draining.set(true);
        this.preFlight.offer(DRAIN_MARKER);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        Thread thread;
        Thread thread2;
        stop();
        if (this.notificationExecutorServiceWasNotSupplied) {
            this.notificationExecutorService.shutdown();
        }
        if (!this.publishRunnerDone.await(this.pollTime, TimeUnit.MILLISECONDS) && (thread2 = this.publishRunnerThread.get()) != null) {
            thread2.interrupt();
        }
        if (this.flightsRunnerDone.await(this.pollTime, TimeUnit.MILLISECONDS) || (thread = this.flightsRunnerThread.get()) == null) {
            return;
        }
        thread.interrupt();
    }

    public int inFlightSize() {
        return this.inFlight.size();
    }

    public int preFlightSize() {
        return this.preFlight.size();
    }

    public void publishRunner() {
        while (this.keepGoingPublishRunner.get()) {
            try {
                try {
                    if (this.notHolding.get()) {
                        PreFlight poll = this.preFlight.poll(this.pollTime, TimeUnit.MILLISECONDS);
                        if (poll != null) {
                            if (poll == DRAIN_MARKER) {
                                this.keepGoingPublishRunner.set(false);
                                this.flightsRunnerDone.countDown();
                                return;
                            }
                            Flight flight = new Flight(poll, this.retryConfig == null ? this.js.publishAsync(poll.subject, poll.headers, poll.body, poll.options) : Retrier.publishAsync(this.retryConfig, this.js, poll.subject, poll.headers, poll.body, poll.options));
                            this.inFlight.offer(flight);
                            poll.flightFuture.complete(flight);
                            if (this.publisherListener != null) {
                                this.notificationExecutorService.submit(() -> {
                                    this.publisherListener.published(flight);
                                });
                            }
                            if (this.inFlight.size() >= this.maxInFlight) {
                                this.notHolding.set(false);
                            }
                        }
                    } else {
                        Thread.sleep(this.holdPauseTime);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.flightsRunnerDone.countDown();
                    return;
                }
            } catch (Throwable th) {
                this.flightsRunnerDone.countDown();
                throw th;
            }
        }
        this.flightsRunnerDone.countDown();
    }

    public void flightsRunner() {
        while (this.keepGoingFlightsRunner.get()) {
            try {
                Flight poll = this.inFlight.poll(this.pollTime, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    if (poll.publishAckFuture.isDone()) {
                        if (poll.publishAckFuture.isCompletedExceptionally()) {
                            if (this.publisherListener != null) {
                                this.notificationExecutorService.submit(() -> {
                                    this.publisherListener.completedExceptionally(poll);
                                });
                            }
                        } else if (this.publisherListener != null) {
                            this.notificationExecutorService.submit(() -> {
                                this.publisherListener.acked(poll);
                            });
                        }
                    } else if (System.currentTimeMillis() - poll.publishTime > this.waitTimeout) {
                        poll.publishAckFuture.completeExceptionally(new IOException("Timeout or no response waiting for publish acknowledgement."));
                        if (this.publisherListener != null) {
                            this.notificationExecutorService.submit(() -> {
                                this.publisherListener.timeout(poll);
                            });
                        }
                    } else {
                        this.inFlight.offer(poll);
                    }
                    if (this.inFlight.size() <= this.refillAllowedAt) {
                        this.notHolding.set(true);
                    }
                } else if (this.draining.get() && this.preFlight.isEmpty() && this.inFlight.isEmpty()) {
                    this.keepGoingFlightsRunner.set(false);
                    return;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            } finally {
                this.flightsRunnerDone.countDown();
            }
        }
    }

    public static Builder builder(JetStream jetStream) {
        return new Builder(jetStream);
    }

    public CompletableFuture<Flight> publishAsync(String str, Headers headers, byte[] bArr, PublishOptions publishOptions) {
        if (this.draining.get()) {
            throw new IllegalStateException("Cannot publish after drain");
        }
        PreFlight preFlight = new PreFlight(this.idPrefix + "-" + this.messageIdGenerator.incrementAndGet(), str, headers, bArr, publishOptions);
        this.preFlight.offer(preFlight);
        return preFlight.flightFuture;
    }

    public CompletableFuture<Flight> publishAsync(String str, byte[] bArr) {
        return publishAsync(str, null, bArr, null);
    }

    public CompletableFuture<Flight> publishAsync(String str, Headers headers, byte[] bArr) {
        return publishAsync(str, headers, bArr, null);
    }

    public CompletableFuture<Flight> publishAsync(String str, byte[] bArr, PublishOptions publishOptions) {
        return publishAsync(str, null, bArr, publishOptions);
    }

    public CompletableFuture<Flight> publishAsync(Message message) {
        return publishAsync(message.getSubject(), message.getHeaders(), message.getData(), null);
    }

    public CompletableFuture<Flight> publishAsync(Message message, PublishOptions publishOptions) {
        return publishAsync(message.getSubject(), message.getHeaders(), message.getData(), publishOptions);
    }
}
