package io.nats.vertx.impl;

import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.JetStreamOptions;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Subscription;
import io.nats.vertx.NatsClient;
import io.nats.vertx.NatsStream;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/nats/vertx/impl/NatsClientImpl.class */
public class NatsClientImpl implements NatsClient {
    private static final Duration noWait = Duration.ofNanos(1);
    private final Vertx vertx;
    private Connection connection;
    private final ContextInternal context;
    private final Options options;
    private Promise<Void> connectFuture;
    private Handler<Throwable> exceptionHandler = th -> {
    };

    public NatsClientImpl(Options.Builder builder, Vertx vertx) {
        this.vertx = vertx;
        this.context = vertx.getOrCreateContext();
        this.options = wireConnectListener(builder, this.context);
    }

    private Options wireConnectListener(Options.Builder builder, ContextInternal contextInternal) {
        Options build = builder.build();
        PromiseInternal promise = contextInternal.promise();
        if (build.getConnectionListener() == null) {
            builder.connectionListener((connection, events) -> {
                if (events == ConnectionListener.Events.CONNECTED) {
                    promise.complete();
                }
            });
        } else {
            ConnectionListener connectionListener = build.getConnectionListener();
            builder.connectionListener((connection2, events2) -> {
                if (events2 == ConnectionListener.Events.CONNECTED) {
                    promise.complete();
                }
                connectionListener.connectionEvent(connection2, events2);
            });
        }
        this.connectFuture = promise;
        return builder.build();
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> connect() {
        this.vertx.runOnContext(r5 -> {
            try {
                this.connection = Nats.connect(this.options);
            } catch (Exception e) {
                handleException(this.connectFuture, e);
            }
        });
        this.vertx.setPeriodic(1000L, new Handler<Long>() { // from class: io.nats.vertx.impl.NatsClientImpl.1
            public void handle(Long l) {
                try {
                    if (NatsClientImpl.this.connection != null && NatsClientImpl.this.connection.getStatus() == Connection.Status.CONNECTED) {
                        NatsClientImpl.this.connection.flush(Duration.ofSeconds(1L));
                    }
                } catch (Exception e) {
                    NatsClientImpl.this.exceptionHandler.handle(e);
                }
            }
        });
        return this.connectFuture.future();
    }

    @Override // io.nats.vertx.NatsClient
    public Future<NatsStream> jetStream() {
        PromiseInternal promise = this.context.promise();
        this.vertx.runOnContext(r10 -> {
            try {
                promise.complete(new NatsStreamImpl(this.connection.jetStream(), this.connection, this.context, this.vertx));
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
        return promise.future();
    }

    @Override // io.nats.vertx.NatsClient
    public Future<NatsStream> jetStream(JetStreamOptions jetStreamOptions) {
        PromiseInternal promise = this.context.promise();
        this.vertx.runOnContext(r11 -> {
            try {
                promise.complete(new NatsStreamImpl(this.connection.jetStream(jetStreamOptions), this.connection, this.context, this.vertx));
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
        return promise.future();
    }

    public WriteStream<Message> exceptionHandler(Handler<Throwable> handler) {
        this.vertx.runOnContext(r5 -> {
            this.exceptionHandler = handler;
        });
        return this;
    }

    public Future<Void> write(Message message) {
        PromiseInternal promise = this.context.promise();
        this.vertx.runOnContext(r7 -> {
            try {
                this.connection.publish(message);
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
        return promise.future();
    }

    public void write(Message message, Handler<AsyncResult<Void>> handler) {
        PromiseInternal promise = this.context.promise();
        this.vertx.runOnContext(r9 -> {
            try {
                this.connection.publish(message);
                promise.complete();
                handler.handle(promise.future());
            } catch (Exception e) {
                handleExceptionWithHandler(handler, promise, e);
            }
        });
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        PromiseInternal promise = this.context.promise();
        this.vertx.runOnContext(r8 -> {
            try {
                this.connection.close();
                promise.complete();
                handler.handle(promise.future());
            } catch (Exception e) {
                handleExceptionWithHandler(handler, promise, e);
            }
        });
    }

    public WriteStream<Message> setWriteQueueMaxSize(int i) {
        return this;
    }

    public boolean writeQueueFull() {
        return false;
    }

    @Override // io.nats.vertx.NatsClient
    public NatsClient drainHandler(Handler<Void> handler) {
        return this;
    }

    @Override // io.nats.vertx.NatsClient
    public void publish(Message message, Handler<AsyncResult<Void>> handler) {
        write(message, handler);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(Message message) {
        return write(message);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(String str, String str2, String str3) {
        return publish(str, str2, str3.getBytes(StandardCharsets.UTF_8));
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(String str, String str2, byte[] bArr) {
        PromiseInternal promise = this.context.promise();
        this.vertx.runOnContext(r10 -> {
            try {
                this.connection.publish(str, str2, bArr);
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
        return promise.future();
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(String str, String str2) {
        return publish(str, str2.getBytes(StandardCharsets.UTF_8));
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(String str, byte[] bArr) {
        PromiseInternal promise = this.context.promise();
        this.vertx.runOnContext(r8 -> {
            try {
                this.connection.publish(str, bArr);
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
        return promise.future();
    }

    private void handleException(Promise<?> promise, Exception exc) {
        promise.fail(exc);
        this.exceptionHandler.handle(exc);
    }

    private void handleExceptionWithHandler(Handler<AsyncResult<Void>> handler, Promise<Void> promise, Exception exc) {
        promise.fail(exc);
        handler.handle(promise.future());
        this.exceptionHandler.handle(exc);
    }

    @Override // io.nats.vertx.NatsClient
    public void request(Message message, Handler<AsyncResult<Message>> handler) {
        PromiseInternal promise = this.context.promise();
        this.vertx.executeBlocking(promise2 -> {
            try {
                promise.complete((Message) this.connection.request(message).get());
                handler.handle(promise.future());
            } catch (Exception e) {
                promise.fail(e);
                handler.handle(promise.future());
                this.exceptionHandler.handle(e);
            }
        });
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(Message message) {
        return this.vertx.executeBlocking(promise -> {
            try {
                promise.complete((Message) this.connection.request(message).get());
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(String str, String str2) {
        return request(str, str2.getBytes(StandardCharsets.UTF_8));
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(String str, byte[] bArr) {
        return this.vertx.executeBlocking(promise -> {
            try {
                promise.complete((Message) this.connection.request(str, bArr).get());
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
    }

    @Override // io.nats.vertx.NatsClient
    public void request(Message message, Handler<AsyncResult<Message>> handler, Duration duration) {
        PromiseInternal promise = this.context.promise();
        this.vertx.executeBlocking(promise2 -> {
            try {
                promise.complete((Message) this.connection.request(message).get(duration.toNanos(), TimeUnit.NANOSECONDS));
                handler.handle(promise.future());
            } catch (Exception e) {
                promise.fail(e);
                handler.handle(promise.future());
                this.exceptionHandler.handle(e);
            }
        });
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(Message message, Duration duration) {
        return this.vertx.executeBlocking(promise -> {
            try {
                promise.complete((Message) this.connection.request(message).get(duration.toNanos(), TimeUnit.NANOSECONDS));
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(String str, String str2, Duration duration) {
        return request(str, str2.getBytes(StandardCharsets.UTF_8), duration);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(String str, byte[] bArr, Duration duration) {
        return this.vertx.executeBlocking(promise -> {
            try {
                promise.complete((Message) this.connection.request(str, bArr).get(duration.toNanos(), TimeUnit.NANOSECONDS));
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> subscribe(String str, Handler<Message> handler) {
        PromiseInternal promise = this.context.promise();
        this.vertx.runOnContext(r9 -> {
            try {
                Subscription subscribe = this.connection.subscribe(str);
                this.vertx.executeBlocking(promise2 -> {
                    drainSubscription(handler, subscribe);
                });
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
        return promise.future();
    }

    private void drainSubscription(Handler<Message> handler, Subscription subscription) {
        try {
            Message nextMessage = subscription.nextMessage(noWait);
            int i = 0;
            while (nextMessage != null) {
                i++;
                try {
                    handler.handle(nextMessage);
                } catch (Exception e) {
                    this.exceptionHandler.handle(e);
                }
                nextMessage = subscription.nextMessage(noWait);
            }
            this.vertx.setTimer(100L, l -> {
                this.vertx.executeBlocking(promise -> {
                    drainSubscription(handler, subscription);
                });
            });
        } catch (Exception e2) {
            this.exceptionHandler.handle(e2);
        }
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> subscribe(String str, String str2, Handler<Message> handler) {
        PromiseInternal promise = this.context.promise();
        this.vertx.runOnContext(r10 -> {
            try {
                Subscription subscribe = this.connection.subscribe(str, str2);
                this.vertx.executeBlocking(promise2 -> {
                    drainSubscription(handler, subscribe);
                });
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
        return promise.future();
    }

    @Override // io.nats.vertx.NatsClient
    public Connection getConnection() {
        return this.connection;
    }

    @Override // io.nats.vertx.NatsClient
    /* renamed from: drainHandler */
    public /* bridge */ /* synthetic */ WriteStream mo0drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    public /* bridge */ /* synthetic */ void write(Object obj, Handler handler) {
        write((Message) obj, (Handler<AsyncResult<Void>>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m3exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
