package io.nats.streaming;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.NUID;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.streaming.Options;
import io.nats.streaming.SubscriptionOptions;
import io.nats.streaming.protobuf.Ack;
import io.nats.streaming.protobuf.CloseRequest;
import io.nats.streaming.protobuf.CloseResponse;
import io.nats.streaming.protobuf.ConnectRequest;
import io.nats.streaming.protobuf.ConnectResponse;
import io.nats.streaming.protobuf.MsgProto;
import io.nats.streaming.protobuf.PubAck;
import io.nats.streaming.protobuf.PubMsg;
import io.nats.streaming.protobuf.SubscriptionRequest;
import io.nats.streaming.protobuf.SubscriptionResponse;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/nats/streaming/StreamingConnectionImpl.class */
public class StreamingConnectionImpl implements StreamingConnection, io.nats.client.MessageHandler {
    static final String ERR_MANUAL_ACK = "stan: cannot manually ack in auto-ack mode";
    static final String INBOX_PREFIX = "_INBOX.";
    private final ReadWriteLock mu;
    private String clientId;
    private String clusterId;
    String pubPrefix;
    String subRequests;
    String unsubRequests;
    String subCloseRequests;
    String closeRequests;
    String ackSubject;
    String hbSubject;
    Map<String, Subscription> subMap;
    Map<String, AckClosure> pubAckMap;
    private BlockingQueue<PubAck> pubAckChan;
    Options opts;
    Connection nc;
    Dispatcher dispatcher;
    NUID nuid;
    final Timer ackTimer;
    boolean ncOwned;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/nats/streaming/StreamingConnectionImpl$AckClosure.class */
    public class AckClosure {
        TimerTask ackTask;
        AckHandler ah;
        BlockingQueue<String> ch;

        AckClosure(AckHandler ackHandler, BlockingQueue<String> blockingQueue) {
            this.ah = ackHandler;
            this.ch = blockingQueue;
        }
    }

    StreamingConnectionImpl(String str, String str2) {
        this(str, str2, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingConnectionImpl(String str, String str2, Options options) {
        this.mu = new ReentrantReadWriteLock();
        this.ackTimer = new Timer("jnats-streaming ack timeout thread", true);
        this.ncOwned = false;
        this.clusterId = str;
        this.clientId = str2;
        this.nuid = new NUID();
        if (options == null) {
            this.opts = new Options.Builder().build();
            return;
        }
        this.opts = options;
        if (this.opts.getNatsConn() != null) {
            setNatsConnection(this.opts.getNatsConn());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingConnectionImpl connect() throws IOException, InterruptedException {
        Connection natsConnection = getNatsConnection();
        if (natsConnection == null) {
            natsConnection = createNatsConnection();
            setNatsConnection(natsConnection);
            this.ncOwned = true;
        } else if (natsConnection.getStatus() != Connection.Status.CONNECTED) {
            throw new IOException("stan: invalid connection");
        }
        try {
            try {
                this.hbSubject = newInbox();
                this.ackSubject = String.format("%s.%s", "_STAN.acks", this.nuid.next());
                this.dispatcher = natsConnection.createDispatcher(message -> {
                    String subject = message.getSubject();
                    if (this.hbSubject.equals(subject)) {
                        processHeartBeat(message);
                    } else if (this.ackSubject.equals(subject)) {
                        processAck(message);
                    } else {
                        processMsg(message);
                    }
                });
                this.dispatcher.subscribe(this.hbSubject);
                this.dispatcher.subscribe(this.ackSubject);
                this.dispatcher.setPendingLimits(-1L, -1L);
                io.nats.client.Message request = natsConnection.request(String.format("%s.%s", this.opts.getDiscoverPrefix(), this.clusterId), ConnectRequest.newBuilder().setClientID(this.clientId).setHeartbeatInbox(this.hbSubject).m117build().toByteArray(), this.opts.getConnectTimeout());
                if (request == null) {
                    throw new IOException("stan: connect request timeout");
                }
                ConnectResponse parseFrom = ConnectResponse.parseFrom(request.getData());
                if (!parseFrom.getError().isEmpty()) {
                    throw new IOException(parseFrom.getError());
                }
                this.pubPrefix = parseFrom.getPubPrefix();
                this.subRequests = parseFrom.getSubRequests();
                this.unsubRequests = parseFrom.getUnsubRequests();
                this.subCloseRequests = parseFrom.getSubCloseRequests();
                this.closeRequests = parseFrom.getCloseRequests();
                this.pubAckMap = new HashMap();
                this.subMap = new HashMap();
                this.pubAckChan = new LinkedBlockingQueue(this.opts.getMaxPubAcksInFlight());
                if (0 != 0) {
                    try {
                        close();
                    } catch (Exception e) {
                    }
                }
                return this;
            } catch (IOException e2) {
                throw e2;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    close();
                } catch (Exception e3) {
                }
            }
            throw th;
        }
    }

    Connection createNatsConnection() throws IOException, InterruptedException {
        Connection connection = null;
        if (getNatsConnection() == null) {
            connection = this.opts.getNatsUrl() != null ? Nats.connect(new Options.Builder().connectionName(this.clientId).server(this.opts.getNatsUrl()).build()) : Nats.connect();
            this.ncOwned = true;
        }
        return connection;
    }

    @Override // io.nats.streaming.StreamingConnection, java.lang.AutoCloseable
    public void close() throws IOException, InterruptedException {
        lock();
        try {
            if (getNatsConnection() == null) {
                return;
            }
            Connection natsConnection = getNatsConnection();
            try {
                setNatsConnection(null);
                for (AckClosure ackClosure : this.pubAckMap.values()) {
                    ackClosure.ackTask.cancel();
                    if (!ackClosure.ch.isEmpty()) {
                        ackClosure.ch.take();
                    }
                }
                this.ackTimer.cancel();
                if (this.dispatcher != null && this.dispatcher.isActive()) {
                    this.dispatcher.unsubscribe(this.ackSubject);
                    this.dispatcher.unsubscribe(this.hbSubject);
                }
                io.nats.client.Message request = natsConnection.request(this.closeRequests, CloseRequest.newBuilder().setClientID(this.clientId).m57build().toByteArray(), this.opts.getConnectTimeout());
                if (request == null) {
                    throw new IOException("stan: close request timeout");
                }
                if (request.getData() != null) {
                    CloseResponse parseFrom = CloseResponse.parseFrom(request.getData());
                    if (!parseFrom.getError().isEmpty()) {
                        throw new IOException(parseFrom.getError());
                    }
                }
                unlock();
            } finally {
                if (this.ncOwned) {
                    try {
                        natsConnection.close();
                    } catch (Exception e) {
                    }
                }
            }
        } finally {
            unlock();
        }
    }

    AckClosure createAckClosure(AckHandler ackHandler, BlockingQueue<String> blockingQueue) {
        return new AckClosure(ackHandler, blockingQueue);
    }

    private SubscriptionImpl createSubscription(String str, String str2, MessageHandler messageHandler, StreamingConnectionImpl streamingConnectionImpl, SubscriptionOptions subscriptionOptions) {
        return new SubscriptionImpl(str, str2, messageHandler, streamingConnectionImpl, subscriptionOptions);
    }

    void processHeartBeat(io.nats.client.Message message) {
        rLock();
        Connection connection = this.nc;
        rUnlock();
        if (connection != null) {
            connection.publish(message.getReplyTo(), (byte[]) null);
        }
    }

    BlockingQueue<String> createErrorChannel() {
        return new LinkedBlockingQueue();
    }

    @Override // io.nats.streaming.StreamingConnection
    public void publish(String str, byte[] bArr) throws IOException, InterruptedException, TimeoutException {
        BlockingQueue<String> createErrorChannel = createErrorChannel();
        publish(str, bArr, null, createErrorChannel);
        if (createErrorChannel.isEmpty()) {
            return;
        }
        String take = createErrorChannel.take();
        if (!take.isEmpty()) {
            throw new IOException(take);
        }
    }

    @Override // io.nats.streaming.StreamingConnection
    public String publish(String str, byte[] bArr, AckHandler ackHandler) throws IOException, InterruptedException, TimeoutException {
        return publish(str, bArr, ackHandler, null);
    }

    private String publish(String str, byte[] bArr, AckHandler ackHandler, BlockingQueue<String> blockingQueue) throws IOException, InterruptedException, TimeoutException {
        AckClosure createAckClosure = createAckClosure(ackHandler, blockingQueue);
        lock();
        try {
            if (getNatsConnection() == null) {
                throw new IllegalStateException("stan: connection closed");
            }
            String str2 = this.pubPrefix + "." + str;
            String nextGlobal = NUID.nextGlobal();
            PubMsg.Builder subject = PubMsg.newBuilder().setClientID(this.clientId).setGuid(nextGlobal).setSubject(str);
            if (bArr != null) {
                subject = subject.setData(ByteString.copyFrom(bArr));
            }
            byte[] byteArray = subject.m238build().toByteArray();
            this.pubAckMap.put(nextGlobal, createAckClosure);
            String str3 = this.ackSubject;
            Duration ackTimeout = this.opts.getAckTimeout();
            BlockingQueue<PubAck> blockingQueue2 = this.pubAckChan;
            unlock();
            try {
                blockingQueue2.put(PubAck.getDefaultInstance());
            } catch (InterruptedException e) {
            }
            this.nc.publish(str2, str3, byteArray);
            lock();
            try {
                createAckClosure.ackTask = createAckTimerTask(nextGlobal);
                this.ackTimer.schedule(createAckClosure.ackTask, ackTimeout.toMillis());
                unlock();
                return nextGlobal;
            } finally {
            }
        } finally {
        }
    }

    @Override // io.nats.streaming.StreamingConnection
    public Subscription subscribe(String str, MessageHandler messageHandler) throws IOException, InterruptedException, TimeoutException {
        return subscribe(str, messageHandler, (SubscriptionOptions) null);
    }

    @Override // io.nats.streaming.StreamingConnection
    public Subscription subscribe(String str, MessageHandler messageHandler, SubscriptionOptions subscriptionOptions) throws IOException, InterruptedException, TimeoutException {
        return subscribe(str, null, messageHandler, subscriptionOptions);
    }

    @Override // io.nats.streaming.StreamingConnection
    public Subscription subscribe(String str, String str2, MessageHandler messageHandler) throws IOException, InterruptedException, TimeoutException {
        return subscribe(str, str2, messageHandler, null);
    }

    @Override // io.nats.streaming.StreamingConnection
    public Subscription subscribe(String str, String str2, MessageHandler messageHandler, SubscriptionOptions subscriptionOptions) throws IOException, InterruptedException, TimeoutException {
        if (subscriptionOptions == null) {
            subscriptionOptions = new SubscriptionOptions.Builder().build();
        }
        lock();
        try {
            if (getNatsConnection() == null) {
                throw new IllegalStateException("stan: connection closed");
            }
            SubscriptionImpl createSubscription = createSubscription(str, str2, messageHandler, this, subscriptionOptions);
            this.subMap.put(createSubscription.inbox, createSubscription);
            Connection natsConnection = getNatsConnection();
            unlock();
            createSubscription.wLock();
            try {
                this.dispatcher.subscribe(createSubscription.inbox);
                io.nats.client.Message request = natsConnection.request(this.subRequests, createSubscriptionRequest(createSubscription).toByteArray(), subscriptionOptions.getSubscriptionTimeout());
                if (request == null) {
                    this.dispatcher.unsubscribe(createSubscription.inbox);
                    throw new IOException("stan: subscribe request timeout");
                }
                try {
                    SubscriptionResponse parseFrom = SubscriptionResponse.parseFrom(request.getData());
                    if (!parseFrom.getError().isEmpty()) {
                        this.dispatcher.unsubscribe(createSubscription.inbox);
                        throw new IOException(parseFrom.getError());
                    }
                    createSubscription.setAckInbox(parseFrom.getAckInbox());
                    createSubscription.wUnlock();
                    return createSubscription;
                } catch (InvalidProtocolBufferException e) {
                    this.dispatcher.unsubscribe(createSubscription.inbox);
                    throw e;
                }
            } catch (Throwable th) {
                createSubscription.wUnlock();
                throw th;
            }
        } catch (Throwable th2) {
            unlock();
            throw th2;
        }
    }

    SubscriptionRequest createSubscriptionRequest(SubscriptionImpl subscriptionImpl) {
        SubscriptionOptions options = subscriptionImpl.getOptions();
        SubscriptionRequest.Builder newBuilder = SubscriptionRequest.newBuilder();
        String clientId = subscriptionImpl.getConnection().getClientId();
        String queue = subscriptionImpl.getQueue();
        newBuilder.setClientID(clientId).setSubject(subscriptionImpl.getSubject()).setQGroup(queue == null ? "" : queue).setInbox(subscriptionImpl.getInbox()).setMaxInFlight(options.getMaxInFlight()).setAckWaitInSecs((int) options.getAckWait().getSeconds());
        switch (options.getStartAt()) {
            case SequenceStart:
                newBuilder.setStartSequence(options.getStartSequence());
                break;
            case TimeDeltaStart:
                newBuilder.setStartTimeDelta(ChronoUnit.NANOS.between(options.getStartTime(), Instant.now()));
                break;
        }
        newBuilder.setStartPosition(options.getStartAt());
        if (options.getDurableName() != null) {
            newBuilder.setDurableName(options.getDurableName());
        }
        return newBuilder.m270build();
    }

    void processAck(io.nats.client.Message message) {
        IOException iOException = null;
        try {
            PubAck parseFrom = PubAck.parseFrom(message.getData());
            AckClosure removeAck = removeAck(parseFrom.getGuid());
            if (removeAck != null) {
                String error = parseFrom.getError();
                if (removeAck.ah != null) {
                    if (!error.isEmpty()) {
                        iOException = new IOException(error);
                    }
                    removeAck.ah.onAck(parseFrom.getGuid(), iOException);
                } else if (removeAck.ch != null) {
                    try {
                        removeAck.ch.put(error);
                    } catch (InterruptedException e) {
                    }
                }
            }
        } catch (InvalidProtocolBufferException e2) {
            System.err.println("Protocol error: " + e2.getStackTrace());
        }
    }

    TimerTask createAckTimerTask(final String str) {
        return new TimerTask() { // from class: io.nats.streaming.StreamingConnectionImpl.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    StreamingConnectionImpl.this.processAckTimeout(str);
                } catch (Exception e) {
                    cancel();
                }
            }
        };
    }

    void processAckTimeout(String str) {
        AckClosure removeAck = removeAck(str);
        if (removeAck == null || removeAck.ah == null) {
            return;
        }
        removeAck.ah.onAck(str, new TimeoutException("stan: publish ack timeout"));
    }

    AckClosure removeAck(String str) {
        TimerTask timerTask = null;
        lock();
        try {
            AckClosure ackClosure = this.pubAckMap.get(str);
            if (ackClosure != null) {
                timerTask = ackClosure.ackTask;
                this.pubAckMap.remove(str);
            }
            BlockingQueue<PubAck> blockingQueue = this.pubAckChan;
            unlock();
            if (timerTask != null) {
                timerTask.cancel();
            }
            if (ackClosure != null && blockingQueue.size() > 0) {
                try {
                    blockingQueue.take();
                } catch (InterruptedException e) {
                }
            }
            return ackClosure;
        } catch (Throwable th) {
            unlock();
            throw th;
        }
    }

    public void onMessage(io.nats.client.Message message) {
        processMsg(message);
    }

    Message createStanMessage(MsgProto msgProto) {
        return new Message(msgProto);
    }

    void processMsg(io.nats.client.Message message) {
        Message message2 = null;
        try {
            message2 = createStanMessage(MsgProto.parseFrom(message.getData()));
        } catch (InvalidProtocolBufferException e) {
        }
        lock();
        try {
            Connection natsConnection = getNatsConnection();
            boolean z = natsConnection == null;
            SubscriptionImpl subscriptionImpl = (SubscriptionImpl) this.subMap.get(message.getSubject());
            unlock();
            if (subscriptionImpl == null || z) {
                return;
            }
            message2.setSubscription(subscriptionImpl);
            subscriptionImpl.rLock();
            try {
                MessageHandler messageHandler = subscriptionImpl.getMessageHandler();
                String ackInbox = subscriptionImpl.getAckInbox();
                boolean isManualAcks = subscriptionImpl.getOptions().isManualAcks();
                StreamingConnectionImpl connection = subscriptionImpl.getConnection();
                subscriptionImpl.rUnlock();
                if (messageHandler != null && connection != null) {
                    messageHandler.onMessage(message2);
                }
                if (isManualAcks) {
                    return;
                }
                natsConnection.publish(ackInbox, Ack.newBuilder().setSubject(message2.getSubject()).setSequence(message2.getSequence()).m27build().toByteArray());
            } catch (Throwable th) {
                subscriptionImpl.rUnlock();
                throw th;
            }
        } catch (Throwable th2) {
            unlock();
            throw th2;
        }
    }

    public String getClientId() {
        return this.clientId;
    }

    @Override // io.nats.streaming.StreamingConnection
    public Connection getNatsConnection() {
        return this.nc;
    }

    private void setNatsConnection(Connection connection) {
        this.nc = connection;
    }

    public String newInbox() {
        return INBOX_PREFIX + this.nuid.next();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void lock() {
        this.mu.writeLock().lock();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unlock() {
        this.mu.writeLock().unlock();
    }

    private void rLock() {
        this.mu.readLock().lock();
    }

    private void rUnlock() {
        this.mu.readLock().unlock();
    }
}
