package io.nats.stan;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.nats.client.Channel;
import io.nats.client.NUID;
import io.nats.stan.Options;
import io.nats.stan.protobuf.Ack;
import io.nats.stan.protobuf.CloseRequest;
import io.nats.stan.protobuf.CloseResponse;
import io.nats.stan.protobuf.ConnectRequest;
import io.nats.stan.protobuf.ConnectResponse;
import io.nats.stan.protobuf.MsgProto;
import io.nats.stan.protobuf.PubAck;
import io.nats.stan.protobuf.PubMsg;
import io.nats.stan.protobuf.SubscriptionRequest;
import io.nats.stan.protobuf.SubscriptionResponse;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/nats/stan/ConnectionImpl.class */
public class ConnectionImpl implements Connection, io.nats.client.MessageHandler {
    static final String DEFAULT_NATS_URL = "nats://localhost:4222";
    static final int DEFAULT_CONNECT_WAIT = 2;
    static final String DEFAULT_DISCOVER_PREFIX = "_STAN.discover";
    static final String DEFAULT_ACK_PREFIX = "_STAN.acks";
    static final int DEFAULT_MAX_PUB_ACKS_IN_FLIGHT = 12;
    static final String PFX = "stan: ";
    static final String ERR_CONNECTION_REQ_TIMEOUT = "stan: connect request timeout";
    static final String ERR_CLOSE_REQ_TIMEOUT = "stan: close request timeout";
    static final String ERR_CONNECTION_CLOSED = "stan: connection closed";
    static final String ERR_TIMEOUT = "stan: publish ack timeout";
    static final String ERR_BAD_ACK = "stan: malformed ack";
    static final String ERR_BAD_SUBSCRIPTION = "stan: invalid subscription";
    static final String ERR_BAD_CONNECTION = "stan: invalid connection";
    static final String ERR_MANUAL_ACK = "stan: cannot manually ack in auto-ack mode";
    static final String ERR_NULL_MSG = "stan: null message";
    static final String SERVER_ERR_BAD_PUB_MSG = "stan: malformed publish message envelope";
    static final String SERVER_ERR_BAD_SUB_REQUEST = "stan: malformed subscription request";
    static final String SERVER_ERR_INVALID_SUBJECT = "stan: invalid subject";
    static final String SERVER_ERR_INVALID_SEQUENCE = "stan: invalid start sequence";
    static final String SERVER_ERR_INVALID_TIME = "stan: invalid start time";
    static final String SERVER_ERR_INVALID_SUB = "stan: invalid subscription";
    static final String SERVER_ERR_INVALID_CONN_REQ = "stan: invalid connection request";
    static final String SERVER_ERR_INVALID_CLIENT = "stan: clientID already registered";
    static final String SERVER_ERR_INVALID_CLOSE_REQ = "stan: invalid close request";
    static final String SERVER_ERR_INVALID_ACK_WAIT = "stan: invalid ack wait time, should be >= 1s";
    static final String SERVER_ERR_DUP_DURABLE = "stan: duplicate durable registration";
    static final String SERVER_ERR_DURABLE_QUEUE = "stan: queue subscribers can't be durable";
    static final Logger logger = LoggerFactory.getLogger(ConnectionImpl.class);
    final Lock mu;
    String clientId;
    String clusterId;
    String pubPrefix;
    String subRequests;
    String unsubRequests;
    String closeRequests;
    String ackSubject;
    io.nats.client.Subscription ackSubscription;
    String hbInbox;
    io.nats.client.Subscription hbSubscription;
    io.nats.client.MessageHandler hbCallback;
    Map<String, Subscription> subMap;
    Map<String, AckClosure> pubAckMap;
    Channel<PubAck> pubAckChan;
    Options opts;
    io.nats.client.Connection nc;
    Timer ackTimer;
    boolean ncOwned;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/nats/stan/ConnectionImpl$AckClosure.class */
    public class AckClosure {
        protected TimerTask ackTask;
        AckHandler ah;
        Channel<Exception> ch;

        AckClosure(AckHandler ackHandler, Channel<Exception> channel) {
            this.ah = ackHandler;
            this.ch = channel;
        }
    }

    protected ConnectionImpl() {
        this.mu = new ReentrantLock();
        this.ackTimer = new Timer(true);
        this.ncOwned = false;
    }

    ConnectionImpl(String str, String str2) {
        this(str, str2, new Options.Builder().create());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectionImpl(String str, String str2, Options options) {
        this.mu = new ReentrantLock();
        this.ackTimer = new Timer(true);
        this.ncOwned = false;
        this.clusterId = str;
        this.clientId = str2;
        this.opts = options;
        if (this.opts == null || this.opts.getNatsConn() == null) {
            return;
        }
        setNatsConnection(this.opts.getNatsConn());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connect() throws IOException, TimeoutException {
        io.nats.client.Connection natsConnection = getNatsConnection();
        if (natsConnection == null) {
            natsConnection = createNatsConnection();
            setNatsConnection(natsConnection);
            this.ncOwned = true;
        }
        this.hbInbox = natsConnection.newInbox();
        this.hbCallback = new io.nats.client.MessageHandler() { // from class: io.nats.stan.ConnectionImpl.1
            public void onMessage(io.nats.client.Message message) {
                ConnectionImpl.this.processHeartBeat(message);
            }
        };
        this.hbSubscription = natsConnection.subscribe(this.hbInbox, this.hbCallback);
        String format = String.format("%s.%s", this.opts.getDiscoverPrefix(), this.clusterId);
        ConnectRequest m185build = ConnectRequest.newBuilder().setClientID(this.clientId).setHeartbeatInbox(this.hbInbox).m185build();
        logger.trace("Sending ConnectRequest:\n{}", m185build.toString().trim());
        try {
            ConnectResponse parseFrom = ConnectResponse.parseFrom(natsConnection.request(format, m185build.toByteArray(), this.opts.getConnectTimeout().toMillis()).getData());
            if (!parseFrom.getError().isEmpty()) {
                throw new IOException(parseFrom.getError());
            }
            logger.trace("Received ConnectResponse:\n{}", parseFrom);
            this.pubPrefix = parseFrom.getPubPrefix();
            this.subRequests = parseFrom.getSubRequests();
            this.unsubRequests = parseFrom.getUnsubRequests();
            this.closeRequests = parseFrom.getCloseRequests();
            this.ackSubject = String.format("%s.%s", DEFAULT_ACK_PREFIX, NUID.nextGlobal());
            this.ackSubscription = natsConnection.subscribe(this.ackSubject, new io.nats.client.MessageHandler() { // from class: io.nats.stan.ConnectionImpl.2
                public void onMessage(io.nats.client.Message message) {
                    ConnectionImpl.this.processAck(message);
                }
            });
            this.ackSubscription.setPendingLimits(1026, 32770);
            this.pubAckMap = new HashMap();
            this.subMap = new HashMap();
            this.pubAckChan = new Channel<>(this.opts.getMaxPubAcksInFlight());
        } catch (IOException e) {
            throw e;
        } catch (TimeoutException e2) {
            throw new TimeoutException(ERR_CONNECTION_REQ_TIMEOUT);
        }
    }

    io.nats.client.ConnectionFactory createNatsConnectionFactory() {
        io.nats.client.ConnectionFactory connectionFactory = new io.nats.client.ConnectionFactory();
        if (this.opts.getNatsUrl() != null) {
            connectionFactory.setUrl(this.opts.getNatsUrl());
        }
        return connectionFactory;
    }

    io.nats.client.Connection createNatsConnection() throws IOException, TimeoutException {
        io.nats.client.Connection connection = null;
        if (getNatsConnection() == null) {
            connection = createNatsConnectionFactory().createConnection();
            this.ncOwned = true;
        }
        return connection;
    }

    @Override // io.nats.stan.Connection, java.lang.AutoCloseable
    public void close() throws IOException, TimeoutException {
        logger.trace("In STAN close()");
        lock();
        try {
            io.nats.client.Connection natsConnection = getNatsConnection();
            if (natsConnection == null) {
                logger.warn("stan: NATS connection already closed");
                unlock();
                return;
            }
            setNatsConnection(null);
            if (getAckSubscription() != null) {
                try {
                    getAckSubscription().unsubscribe();
                } catch (Exception e) {
                    logger.warn("stan: error unsubscribing from acks during connection close");
                    logger.debug("Full stack trace: ", e);
                }
            }
            if (getHbSubscription() != null) {
                try {
                    getHbSubscription().unsubscribe();
                } catch (Exception e2) {
                    logger.warn("stan: error unsubscribing from heartbeats during connection close");
                    logger.debug("Full stack trace: ", e2);
                }
            }
            CloseRequest m91build = CloseRequest.newBuilder().setClientID(this.clientId).m91build();
            logger.trace("CLOSE request: [{}]", m91build);
            try {
                io.nats.client.Message request = natsConnection.request(this.closeRequests, m91build.toByteArray(), this.opts.getConnectTimeout().toMillis());
                logger.trace("CLOSE response: [{}]", request);
                if (request.getData() != null) {
                    CloseResponse parseFrom = CloseResponse.parseFrom(request.getData());
                    if (!parseFrom.getError().isEmpty()) {
                        throw new IOException(parseFrom.getError());
                    }
                }
                if (this.ncOwned) {
                    try {
                        natsConnection.close();
                    } catch (Exception e3) {
                        logger.warn("NATS connection was null in close()");
                    }
                }
            } catch (TimeoutException e4) {
                throw new TimeoutException(ERR_CLOSE_REQ_TIMEOUT);
            } catch (Exception e5) {
                throw e5;
            }
        } finally {
            unlock();
        }
    }

    protected AckClosure createAckClosure(AckHandler ackHandler, Channel<Exception> channel) {
        return new AckClosure(ackHandler, channel);
    }

    TimerTask createAckTimerTask(final String str, final AckHandler ackHandler) {
        return new TimerTask() { // from class: io.nats.stan.ConnectionImpl.3
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                ConnectionImpl.this.processAckTimeout(str, ackHandler);
            }
        };
    }

    protected SubscriptionImpl createSubscription(String str, String str2, MessageHandler messageHandler, ConnectionImpl connectionImpl, SubscriptionOptions subscriptionOptions) {
        return new SubscriptionImpl(str, str2, messageHandler, connectionImpl, subscriptionOptions);
    }

    protected void processHeartBeat(io.nats.client.Message message) {
        try {
            this.nc.publish(message.getReplyTo(), (byte[]) null);
            logger.debug("Sent heartbeat response");
        } catch (IOException e) {
            logger.warn("stan: error publishing heartbeat response: {}", e.getMessage());
            logger.debug("Full stack trace:", e);
        }
    }

    Channel<Exception> createExceptionChannel() {
        return new Channel<>();
    }

    @Override // io.nats.stan.Connection
    public void publish(String str, byte[] bArr) throws IOException {
        Channel<Exception> createExceptionChannel = createExceptionChannel();
        publish(str, bArr, null, createExceptionChannel);
        if (createExceptionChannel.getCount() != 0) {
            throw new IOException((Throwable) createExceptionChannel.get());
        }
    }

    @Override // io.nats.stan.Connection
    public String publish(String str, byte[] bArr, AckHandler ackHandler) throws IOException {
        return publish(str, bArr, ackHandler, null);
    }

    String publish(String str, byte[] bArr, AckHandler ackHandler, Channel<Exception> channel) throws IOException {
        AckClosure createAckClosure = createAckClosure(ackHandler, channel);
        lock();
        try {
            if (getNatsConnection() == null) {
                throw new IllegalStateException(ERR_CONNECTION_CLOSED);
            }
            String format = String.format("%s.%s", this.pubPrefix, str);
            String nextGlobal = NUID.nextGlobal();
            PubMsg.Builder subject = PubMsg.newBuilder().setClientID(this.clientId).setGuid(nextGlobal).setSubject(str);
            if (bArr != null) {
                subject = subject.setData(ByteString.copyFrom(bArr));
            }
            byte[] byteArray = subject.m374build().toByteArray();
            this.pubAckMap.put(nextGlobal, createAckClosure);
            String str2 = this.ackSubject;
            Duration ackTimeout = this.opts.getAckTimeout();
            Channel<PubAck> channel2 = this.pubAckChan;
            unlock();
            try {
                channel2.put(PubAck.getDefaultInstance());
            } catch (InterruptedException e) {
                logger.warn("stan: interrupted while writing to publish ack channel");
            }
            try {
                this.nc.publish(format, str2, byteArray);
                lock();
                try {
                    try {
                        createAckClosure.ackTask = createAckTimerTask(nextGlobal, ackHandler);
                        this.ackTimer.schedule(createAckClosure.ackTask, ackTimeout.toMillis());
                        unlock();
                        return nextGlobal;
                    } finally {
                    }
                } catch (Exception e2) {
                    throw e2;
                }
            } catch (IOException e3) {
                removeAck(nextGlobal);
                throw e3;
            }
        } finally {
        }
    }

    @Override // io.nats.stan.Connection
    public Subscription subscribe(String str, MessageHandler messageHandler) throws IOException, TimeoutException {
        return subscribe(str, messageHandler, (SubscriptionOptions) null);
    }

    @Override // io.nats.stan.Connection
    public Subscription subscribe(String str, MessageHandler messageHandler, SubscriptionOptions subscriptionOptions) throws IOException, TimeoutException {
        return subscribe(str, null, messageHandler, subscriptionOptions);
    }

    @Override // io.nats.stan.Connection
    public Subscription subscribe(String str, String str2, MessageHandler messageHandler) throws IOException, TimeoutException {
        return subscribe(str, str2, messageHandler, null);
    }

    @Override // io.nats.stan.Connection
    public Subscription subscribe(String str, String str2, MessageHandler messageHandler, SubscriptionOptions subscriptionOptions) throws IOException, TimeoutException {
        lock();
        try {
            if (getNatsConnection() == null) {
                throw new IllegalStateException(ERR_CONNECTION_CLOSED);
            }
            SubscriptionImpl createSubscription = createSubscription(str, str2, messageHandler, this, subscriptionOptions);
            this.subMap.put(createSubscription.inbox, createSubscription);
            getNatsConnection();
            unlock();
            createSubscription.wLock();
            try {
                createSubscription.inboxSub = this.nc.subscribe(createSubscription.inbox, this);
                SubscriptionRequest createSubscriptionRequest = createSubscriptionRequest(createSubscription);
                try {
                    logger.trace("Sending SubscriptionRequest:\n{}", createSubscriptionRequest);
                    try {
                        SubscriptionResponse parseFrom = SubscriptionResponse.parseFrom(this.nc.request(this.subRequests, createSubscriptionRequest.toByteArray(), 2L, TimeUnit.SECONDS).getData());
                        logger.trace("Received SubscriptionResponse:\n{}", parseFrom);
                        if (!parseFrom.getError().isEmpty()) {
                            throw new IOException(parseFrom.getError());
                        }
                        createSubscription.setAckInbox(parseFrom.getAckInbox());
                        createSubscription.wUnlock();
                        return createSubscription;
                    } catch (InvalidProtocolBufferException e) {
                        throw e;
                    }
                } catch (TimeoutException e2) {
                    throw new TimeoutException(ERR_TIMEOUT);
                }
            } catch (Throwable th) {
                createSubscription.wUnlock();
                throw th;
            }
        } catch (Throwable th2) {
            unlock();
            throw th2;
        }
    }

    protected SubscriptionRequest createSubscriptionRequest(SubscriptionImpl subscriptionImpl) {
        SubscriptionOptions options = subscriptionImpl.getOptions();
        SubscriptionRequest.Builder newBuilder = SubscriptionRequest.newBuilder();
        String clientId = subscriptionImpl.getConnection().getClientId();
        String queue = subscriptionImpl.getQueue();
        newBuilder.setClientID(clientId).setSubject(subscriptionImpl.getSubject()).setQGroup(queue == null ? "" : queue).setInbox(subscriptionImpl.getInbox()).setMaxInFlight(options.getMaxInFlight()).setAckWaitInSecs((int) options.getAckWait().getSeconds());
        switch (options.getStartAt()) {
            case SequenceStart:
                newBuilder.setStartSequence(options.getStartSequence());
                break;
            case TimeDeltaStart:
                newBuilder.setStartTimeDelta(ChronoUnit.NANOS.between(options.getStartTime(), Instant.now()));
                break;
        }
        newBuilder.setStartPosition(options.getStartAt());
        if (options.getDurableName() != null) {
            newBuilder.setDurableName(options.getDurableName());
        }
        return newBuilder.m423build();
    }

    protected void processAck(io.nats.client.Message message) {
        try {
            PubAck parseFrom = PubAck.parseFrom(message.getData());
            AckClosure removeAck = removeAck(parseFrom.getGuid());
            if (!parseFrom.getError().isEmpty()) {
                logger.error("stan: protobuf PubAck error: {}", parseFrom.getError());
            }
            if (removeAck == null || removeAck.ah == null) {
                return;
            }
            removeAck.ah.onAck(parseFrom.getGuid(), null);
        } catch (InvalidProtocolBufferException e) {
            logger.error("stan: error unmarshaling PubAck");
            logger.debug("Full stack trace: ", e);
        }
    }

    protected void processAckTimeout(String str, AckHandler ackHandler) {
        removeAck(str);
        if (ackHandler != null) {
            ackHandler.onAck(str, new TimeoutException(ERR_TIMEOUT));
        }
    }

    protected AckClosure removeAck(String str) {
        lock();
        try {
            AckClosure ackClosure = this.pubAckMap.get(str);
            this.pubAckMap.remove(str);
            Channel<PubAck> channel = this.pubAckChan;
            unlock();
            if (ackClosure != null && ackClosure.ackTask != null) {
                ackClosure.ackTask.cancel();
                ackClosure.ackTask = null;
            }
            if (ackClosure != null && channel.getCount() > 0) {
                channel.get();
            }
            return ackClosure;
        } catch (Throwable th) {
            unlock();
            throw th;
        }
    }

    public void onMessage(io.nats.client.Message message) {
        processMsg(message);
    }

    protected Message createStanMessage(MsgProto msgProto) {
        return new Message(msgProto);
    }

    protected void processMsg(io.nats.client.Message message) {
        Message message2 = null;
        try {
            message2 = createStanMessage(MsgProto.parseFrom(message.getData()));
        } catch (InvalidProtocolBufferException e) {
            logger.error("stan: error unmarshaling msg");
            logger.debug("msg: {}", message);
            logger.debug("full stack trace:", e);
        }
        lock();
        try {
            try {
                io.nats.client.Connection natsConnection = getNatsConnection();
                boolean z = natsConnection == null;
                SubscriptionImpl subscriptionImpl = (SubscriptionImpl) this.subMap.get(message.getSubject());
                unlock();
                if (subscriptionImpl == null || z) {
                    return;
                }
                message2.setSubscription(subscriptionImpl);
                subscriptionImpl.rLock();
                try {
                    try {
                        MessageHandler messageHandler = subscriptionImpl.getMessageHandler();
                        String ackInbox = subscriptionImpl.getAckInbox();
                        boolean isManualAcks = subscriptionImpl.getOptions().isManualAcks();
                        ConnectionImpl connection = subscriptionImpl.getConnection();
                        subscriptionImpl.rUnlock();
                        if (messageHandler != null && connection != null) {
                            messageHandler.onMessage(message2);
                        }
                        if (isManualAcks) {
                            return;
                        }
                        try {
                            natsConnection.publish(ackInbox, Ack.newBuilder().setSubject(message2.getSubject()).setSequence(message2.getSequence()).m44build().toByteArray());
                        } catch (IOException e2) {
                            logger.error("Exception while publishing auto-ack: {}", e2.getMessage());
                            logger.debug("Stack trace: ", e2);
                        }
                    } catch (Throwable th) {
                        subscriptionImpl.rUnlock();
                        throw th;
                    }
                } catch (Exception e3) {
                    throw e3;
                }
            } catch (Exception e4) {
                throw e4;
            }
        } catch (Throwable th2) {
            unlock();
            throw th2;
        }
    }

    public String getClientId() {
        return this.clientId;
    }

    @Override // io.nats.stan.Connection
    public io.nats.client.Connection getNatsConnection() {
        return this.nc;
    }

    protected void setNatsConnection(io.nats.client.Connection connection) {
        this.nc = connection;
    }

    public String newInbox() {
        return this.nc.newInbox();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void lock() {
        this.mu.lock();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unlock() {
        this.mu.unlock();
    }

    protected io.nats.client.Subscription getAckSubscription() {
        return this.ackSubscription;
    }

    protected io.nats.client.Subscription getHbSubscription() {
        return this.hbSubscription;
    }

    void setPubAckChan(Channel<PubAck> channel) {
        this.pubAckChan = channel;
    }

    Channel<PubAck> getPubAckChan() {
        return this.pubAckChan;
    }

    void setPubAckMap(Map<String, AckClosure> map) {
        this.pubAckMap = map;
    }

    Map<String, AckClosure> getPubAckMap() {
        return this.pubAckMap;
    }

    void setSubMap(Map<String, Subscription> map) {
        this.subMap = map;
    }

    Map<String, Subscription> getSubMap() {
        return this.subMap;
    }
}
