package io.synadia.jnats.extension;

import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.Message;
import io.nats.client.NUID;
import io.nats.client.PublishOptions;
import io.nats.client.impl.Headers;
import io.synadia.retrier.RetryConfig;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/* loaded from: input_file:io/synadia/jnats/extension/AsyncJsPublisher.class */
public class AsyncJsPublisher implements AutoCloseable {
    public static final int DEFAULT_MAX_IN_FLIGHT = 50;
    public static final int DEFAULT_REFILL_AMOUNT = 0;
    public static final long DEFAULT_POLL_TIME = 100;
    public static final long DEFAULT_HOLD_PAUSE_TIME = 100;
    public static final long DEFAULT_WAIT_TIMEOUT = 5000;
    private static final PreFlight STOP_MARKER = new PreFlight("STOP", null, null, null, null);
    private final AtomicLong messageIdGenerator;
    private final JetStream js;
    private final Supplier<String> messageIdSupplier;
    private final String idPrefix;
    private final int maxInFlight;
    private final int refillAllowedAt;
    private final RetryConfig retryConfig;
    private final AsyncJsPublishListener publishListener;
    private final long pollTime;
    private final long holdPauseTime;
    private final long waitTimeout;
    private final boolean processAcksInOrder;
    private final LinkedBlockingQueue<PreFlight> preFlight;
    private final LinkedBlockingQueue<InFlight> inFlights;
    private final AtomicBoolean notInHoldingPattern;
    private final AtomicBoolean draining;
    private final AtomicBoolean keepGoingPublishRunner;
    private final AtomicBoolean keepGoingFlightsRunner;
    private final ExecutorService notificationExecutorService;
    private final boolean executorWasntUserSupplied;
    private final AtomicReference<Thread> publishRunnerThread;
    private final AtomicReference<Thread> flightsRunnerThread;
    private final CountDownLatch publishRunnerDoneLatch;
    private final CountDownLatch flightsRunnerDoneLatch;

    /* loaded from: input_file:io/synadia/jnats/extension/AsyncJsPublisher$Builder.class */
    public static class Builder {
        JetStream js;
        Supplier<String> messageIdSupplier;
        RetryConfig retryConfig;
        AsyncJsPublishListener publishListener;
        ExecutorService notificationExecutorService;
        int maxInFlight = 50;
        int refillAllowedAt = 0;
        long pollTime = 100;
        long holdPauseTime = 100;
        long waitTimeout = AsyncJsPublisher.DEFAULT_WAIT_TIMEOUT;
        boolean processAcksInOrder = true;

        public Builder(JetStream jetStream) {
            if (jetStream == null) {
                throw new IllegalArgumentException("JetStream context is required.");
            }
            this.js = jetStream;
        }

        public Builder messageIdSupplier(Supplier<String> supplier) {
            this.messageIdSupplier = supplier;
            return this;
        }

        public Builder maxInFlight(int i) {
            this.maxInFlight = i;
            return this;
        }

        public Builder refillAllowedAt(int i) {
            this.refillAllowedAt = i;
            return this;
        }

        public Builder retryConfig(RetryConfig retryConfig) {
            this.retryConfig = retryConfig;
            return this;
        }

        public Builder publishListener(AsyncJsPublishListener asyncJsPublishListener) {
            this.publishListener = asyncJsPublishListener;
            return this;
        }

        public Builder pollTime(long j) {
            this.pollTime = j;
            return this;
        }

        public Builder holdPauseTime(long j) {
            this.holdPauseTime = j;
            return this;
        }

        public Builder waitTimeout(long j) {
            this.waitTimeout = j;
            return this;
        }

        public Builder processAcksInOrder(boolean z) {
            this.processAcksInOrder = z;
            return this;
        }

        public Builder notificationExecutorService(ExecutorService executorService) {
            this.notificationExecutorService = executorService;
            return this;
        }

        public AsyncJsPublisher build() {
            return new AsyncJsPublisher(this);
        }

        public AsyncJsPublisher start() {
            AsyncJsPublisher asyncJsPublisher = new AsyncJsPublisher(this);
            asyncJsPublisher.start();
            return asyncJsPublisher;
        }
    }

    private AsyncJsPublisher(Builder builder) {
        this.js = builder.js;
        if (builder.messageIdSupplier == null) {
            this.idPrefix = new NUID().nextSequence();
            this.messageIdGenerator = new AtomicLong(0L);
            this.messageIdSupplier = () -> {
                return this.idPrefix + "-" + this.messageIdGenerator.incrementAndGet();
            };
        } else {
            this.idPrefix = null;
            this.messageIdGenerator = null;
            this.messageIdSupplier = builder.messageIdSupplier;
        }
        this.maxInFlight = builder.maxInFlight;
        this.refillAllowedAt = builder.refillAllowedAt;
        this.retryConfig = builder.retryConfig;
        this.publishListener = builder.publishListener;
        this.pollTime = builder.pollTime;
        this.holdPauseTime = builder.holdPauseTime;
        this.waitTimeout = builder.waitTimeout;
        this.processAcksInOrder = builder.processAcksInOrder;
        if (builder.notificationExecutorService == null) {
            this.notificationExecutorService = Executors.newFixedThreadPool(1);
            this.executorWasntUserSupplied = true;
        } else {
            this.notificationExecutorService = builder.notificationExecutorService;
            this.executorWasntUserSupplied = false;
        }
        this.preFlight = new LinkedBlockingQueue<>();
        this.inFlights = new LinkedBlockingQueue<>();
        this.notInHoldingPattern = new AtomicBoolean(true);
        this.draining = new AtomicBoolean(false);
        this.keepGoingPublishRunner = new AtomicBoolean(true);
        this.keepGoingFlightsRunner = new AtomicBoolean(true);
        this.publishRunnerThread = new AtomicReference<>();
        this.flightsRunnerThread = new AtomicReference<>();
        this.publishRunnerDoneLatch = new CountDownLatch(1);
        this.flightsRunnerDoneLatch = new CountDownLatch(1);
    }

    public void start() {
        Thread thread = new Thread(this::publishRunner);
        thread.start();
        this.publishRunnerThread.set(thread);
        Thread thread2 = new Thread(this::flightsRunner);
        thread2.start();
        this.flightsRunnerThread.set(thread2);
    }

    public void stop() {
        this.keepGoingPublishRunner.set(false);
        this.keepGoingFlightsRunner.set(false);
    }

    public void drain() {
        this.preFlight.offer(STOP_MARKER);
        this.draining.set(true);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        Thread thread;
        Thread thread2;
        stop();
        if (this.executorWasntUserSupplied) {
            this.notificationExecutorService.shutdown();
        }
        if (!this.publishRunnerDoneLatch.await(this.pollTime, TimeUnit.MILLISECONDS) && (thread2 = this.publishRunnerThread.get()) != null && thread2.isAlive()) {
            thread2.interrupt();
        }
        if (this.flightsRunnerDoneLatch.await(this.pollTime, TimeUnit.MILLISECONDS) || (thread = this.flightsRunnerThread.get()) == null || !thread.isAlive()) {
            return;
        }
        thread.interrupt();
    }

    public int inFlightSize() {
        return this.inFlights.size();
    }

    public int preFlightSize() {
        return this.preFlight.size();
    }

    public CountDownLatch getPublishRunnerDoneLatch() {
        return this.publishRunnerDoneLatch;
    }

    public CountDownLatch getFlightsRunnerDoneLatch() {
        return this.flightsRunnerDoneLatch;
    }

    public int getMaxInFlight() {
        return this.maxInFlight;
    }

    public int getRefillAllowedAt() {
        return this.refillAllowedAt;
    }

    public long getPollTime() {
        return this.pollTime;
    }

    public long getHoldPauseTime() {
        return this.holdPauseTime;
    }

    public long getWaitTimeout() {
        return this.waitTimeout;
    }

    public boolean getProcessAcksInOrder() {
        return this.processAcksInOrder;
    }

    public void publishRunner() {
        while (this.keepGoingPublishRunner.get()) {
            try {
                try {
                    if (this.notInHoldingPattern.get()) {
                        PreFlight poll = this.preFlight.poll(this.pollTime, TimeUnit.MILLISECONDS);
                        if (poll != null) {
                            if (poll == STOP_MARKER) {
                                return;
                            }
                            InFlight inFlight = new InFlight(this.retryConfig == null ? this.js.publishAsync(poll.subject, poll.headers, poll.body, poll.options) : PublishRetrier.publishAsync(this.retryConfig, this.js, poll.subject, poll.headers, poll.body, poll.options), poll);
                            this.inFlights.offer(inFlight);
                            poll.inFlightFuture.complete(inFlight);
                            notifyPublished(inFlight);
                            if (this.inFlights.size() >= this.maxInFlight) {
                                this.notInHoldingPattern.set(false);
                            }
                        }
                    } else {
                        Thread.sleep(this.holdPauseTime);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.keepGoingPublishRunner.set(false);
                    this.publishRunnerDoneLatch.countDown();
                    return;
                }
            } finally {
                this.keepGoingPublishRunner.set(false);
                this.publishRunnerDoneLatch.countDown();
            }
        }
        this.keepGoingPublishRunner.set(false);
        this.publishRunnerDoneLatch.countDown();
    }

    public void flightsRunner() {
        while (this.keepGoingFlightsRunner.get()) {
            try {
                InFlight poll = this.inFlights.poll(this.pollTime, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    try {
                        if (this.processAcksInOrder || poll.publishAckFuture.isDone()) {
                            notifyCompleted(new PostFlight(poll, poll.publishAckFuture.get(this.waitTimeout, TimeUnit.MILLISECONDS)));
                        } else {
                            this.inFlights.offer(poll);
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (ExecutionException e2) {
                        handleExecutionException(e2, poll);
                    } catch (TimeoutException e3) {
                        notifyTimeout(new PostFlight(poll, true, false, e3));
                    }
                    if (this.inFlights.size() <= this.refillAllowedAt) {
                        this.notInHoldingPattern.set(true);
                    }
                } else if (this.draining.get() && this.preFlight.isEmpty() && this.inFlights.isEmpty()) {
                    return;
                }
            } catch (InterruptedException e4) {
                Thread.currentThread().interrupt();
                return;
            } finally {
                this.keepGoingFlightsRunner.set(false);
                this.flightsRunnerDoneLatch.countDown();
            }
        }
    }

    private void handleExecutionException(ExecutionException executionException, InFlight inFlight) {
        Throwable cause = executionException.getCause() == null ? executionException : executionException.getCause();
        if (cause instanceof JetStreamApiException) {
            if (cause.getMessage().contains("10060") || cause.getMessage().contains("10070") || cause.getMessage().contains("10071")) {
                notifyCompletedExceptionally(new PostFlight(inFlight, false, true, cause));
                return;
            } else {
                notifyCompletedExceptionally(new PostFlight(inFlight, cause));
                return;
            }
        }
        if (cause instanceof IOException) {
            if (cause.getMessage().contains("Timeout or no response")) {
                notifyTimeout(new PostFlight(inFlight, true, false, cause));
            } else {
                notifyCompletedExceptionally(new PostFlight(inFlight, cause));
            }
        }
    }

    private void notifyPublished(InFlight inFlight) {
        if (this.publishListener != null) {
            this.notificationExecutorService.submit(() -> {
                this.publishListener.published(inFlight);
            });
        }
    }

    private void notifyCompletedExceptionally(PostFlight postFlight) {
        if (this.publishListener != null) {
            this.notificationExecutorService.submit(() -> {
                this.publishListener.completedExceptionally(postFlight);
            });
        }
    }

    private void notifyCompleted(PostFlight postFlight) {
        if (this.publishListener != null) {
            this.notificationExecutorService.submit(() -> {
                this.publishListener.acked(postFlight);
            });
        }
    }

    private void notifyTimeout(PostFlight postFlight) {
        if (this.publishListener != null) {
            this.notificationExecutorService.submit(() -> {
                this.publishListener.timeout(postFlight);
            });
        }
    }

    public static Builder builder(JetStream jetStream) {
        return new Builder(jetStream);
    }

    public PreFlight publishAsync(String str, Headers headers, byte[] bArr, PublishOptions publishOptions) {
        if (this.draining.get() || !(this.keepGoingPublishRunner.get() || this.keepGoingFlightsRunner.get())) {
            throw new IllegalStateException("Cannot publish once drained or stopped.");
        }
        PreFlight preFlight = new PreFlight((publishOptions == null || publishOptions.getMessageId() == null) ? this.messageIdSupplier.get() : publishOptions.getMessageId(), str, headers, bArr, publishOptions);
        this.preFlight.offer(preFlight);
        return preFlight;
    }

    public PreFlight publishAsync(String str, byte[] bArr) {
        return publishAsync(str, null, bArr, null);
    }

    public PreFlight publishAsync(String str, Headers headers, byte[] bArr) {
        return publishAsync(str, headers, bArr, null);
    }

    public PreFlight publishAsync(String str, byte[] bArr, PublishOptions publishOptions) {
        return publishAsync(str, null, bArr, publishOptions);
    }

    public PreFlight publishAsync(Message message) {
        return publishAsync(message.getSubject(), message.getHeaders(), message.getData(), null);
    }

    public PreFlight publishAsync(Message message, PublishOptions publishOptions) {
        return publishAsync(message.getSubject(), message.getHeaders(), message.getData(), publishOptions);
    }
}
