package io.nats.client;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/* loaded from: input_file:BOOT-INF/lib/jnats-0.5.3.jar:io/nats/client/AsyncSubscriptionImpl.class */
class AsyncSubscriptionImpl extends SubscriptionImpl implements AsyncSubscription {
    private ExecutorService executor;
    private MessageHandler msgHandler;

    /* JADX INFO: Access modifiers changed from: protected */
    public AsyncSubscriptionImpl(ConnectionImpl connectionImpl, String str, String str2, MessageHandler messageHandler, int i, long j) {
        super(connectionImpl, str, str2, i, j);
        this.executor = null;
        this.msgHandler = messageHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.nats.client.SubscriptionImpl
    public boolean processMsg(Message message) {
        this.mu.lock();
        try {
            Connection connection = getConnection();
            MessageHandler messageHandler = this.msgHandler;
            long j = this.max;
            this.mu.unlock();
            if (messageHandler == null) {
                return true;
            }
            if (connection == null) {
                return false;
            }
            long tallyDeliveredMessage = tallyDeliveredMessage(message);
            if (j > 0 && tallyDeliveredMessage > j) {
                return true;
            }
            try {
                messageHandler.onMessage(message);
            } catch (Exception e) {
                this.logger.error("Error in callback", (Throwable) e);
            }
            if (tallyDeliveredMessage != j) {
                return true;
            }
            try {
                unsubscribe();
            } catch (Exception e2) {
                this.logger.error("Error in unsubscribe", (Throwable) e2);
            }
            this.conn = null;
            return true;
        } catch (Throwable th) {
            this.mu.unlock();
            throw th;
        }
    }

    boolean isStarted() {
        return this.executor != null;
    }

    void enable() {
        Runnable runnable = new Runnable() { // from class: io.nats.client.AsyncSubscriptionImpl.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AsyncSubscriptionImpl.this.logger.trace("msgFeeder starting for subj: {} sid: {}", AsyncSubscriptionImpl.this.subject, Long.valueOf(AsyncSubscriptionImpl.this.sid));
                    if (AsyncSubscriptionImpl.this.conn == null || AsyncSubscriptionImpl.this.mch == null) {
                        AsyncSubscriptionImpl.this.logger.error("Exiting due to NULL connection or NULL message channel");
                    } else {
                        AsyncSubscriptionImpl.this.logger.trace("msgFeeder entering delivery loop for subj: {} sid: {}", AsyncSubscriptionImpl.this.subject, Long.valueOf(AsyncSubscriptionImpl.this.sid));
                        AsyncSubscriptionImpl.this.conn.deliverMsgs(AsyncSubscriptionImpl.this.mch);
                    }
                } catch (Exception e) {
                    AsyncSubscriptionImpl.this.logger.error("Error on async subscription for subject {}", AsyncSubscriptionImpl.this.getSubject());
                    e.printStackTrace();
                }
            }
        };
        if (isStarted()) {
            return;
        }
        this.executor = Executors.newSingleThreadExecutor(new NATSThreadFactory("msgfeeder"));
        this.executor.execute(runnable);
        this.logger.trace("Started msgFeeder for subject: " + getSubject() + " sid: " + getSid());
    }

    void disable() {
        if (isStarted()) {
            this.executor.shutdownNow();
            this.executor = null;
        }
    }

    @Override // io.nats.client.AsyncSubscription
    public void setMessageHandler(MessageHandler messageHandler) {
        this.msgHandler = messageHandler;
    }

    @Override // io.nats.client.AsyncSubscription
    public void start() {
        if (isStarted()) {
            return;
        }
        if (!isValid()) {
            throw new IllegalStateException(Constants.ERR_BAD_SUBSCRIPTION);
        }
        enable();
        this.conn.sendSubscriptionMessage(this);
    }

    @Override // io.nats.client.SubscriptionImpl, io.nats.client.Subscription, java.lang.AutoCloseable
    public void close() {
        super.close();
        disable();
    }

    @Override // io.nats.client.SubscriptionImpl, io.nats.client.Subscription
    public void unsubscribe() throws IOException {
        super.unsubscribe();
        disable();
    }
}
