package io.nats.vertx.impl;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.Message;
import io.nats.client.PublishOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.PublishAck;
import io.nats.vertx.NatsStream;
import io.nats.vertx.NatsVertxMessage;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

/* loaded from: input_file:io/nats/vertx/impl/NatsStreamImpl.class */
public class NatsStreamImpl implements NatsStream {
    private final JetStream jetStream;
    private final ContextInternal context;
    private final Vertx vertx;
    private static final Duration noWait = Duration.ofNanos(1);
    private static final Duration drainWait = Duration.ofMillis(50);
    private final Connection connection;
    private final Map<String, Dispatcher> subscriptionMap = new HashMap();
    private Handler<Throwable> exceptionHandler = th -> {
    };

    public NatsStreamImpl(JetStream jetStream, Connection connection, ContextInternal contextInternal, Vertx vertx) {
        this.connection = connection;
        this.jetStream = jetStream;
        this.context = contextInternal;
        this.vertx = vertx;
    }

    public WriteStream<Message> exceptionHandler(Handler<Throwable> handler) {
        this.vertx.executeBlocking(promise -> {
            this.exceptionHandler = handler;
        });
        return this;
    }

    public Future<Void> write(Message message) {
        PromiseInternal promise = this.context.promise();
        doPublish(message, promise);
        return promise.future();
    }

    public void write(Message message, Handler<AsyncResult<Void>> handler) {
        PromiseInternal promise = this.context.promise();
        doPublish(message, promise);
        handler.handle(promise.future());
    }

    private void doPublish(Message message, Promise<Void> promise) {
        try {
            PublishAck publish = this.jetStream.publish(message);
            if (publish.isDuplicate()) {
                promise.fail("Duplicate message " + publish);
            } else if (publish.hasError()) {
                promise.fail(publish.getError() + " " + publish);
            } else {
                promise.complete();
            }
        } catch (Exception e) {
            handleException(promise, e);
        }
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        handler.handle(this.context.promise().future());
    }

    public WriteStream<Message> setWriteQueueMaxSize(int i) {
        return this;
    }

    public boolean writeQueueFull() {
        return false;
    }

    @Override // io.nats.vertx.NatsStream
    public NatsStream drainHandler(Handler<Void> handler) {
        return this;
    }

    @Override // io.nats.vertx.NatsStream
    public Future<PublishAck> publish(Message message) {
        PromiseInternal promise = this.context.promise();
        try {
            promise.complete(this.jetStream.publish(message));
        } catch (Exception e) {
            handleException(promise, e);
        }
        return promise.future();
    }

    @Override // io.nats.vertx.NatsStream
    public Future<PublishAck> publish(String str, String str2) {
        return publish(str, str2.getBytes(StandardCharsets.UTF_8));
    }

    @Override // io.nats.vertx.NatsStream
    public Future<PublishAck> publish(String str, byte[] bArr) {
        PromiseInternal promise = this.context.promise();
        try {
            promise.complete(this.jetStream.publish(str, bArr));
        } catch (Exception e) {
            handleException(promise, e);
        }
        return promise.future();
    }

    @Override // io.nats.vertx.NatsStream
    public void publish(Message message, Handler<AsyncResult<PublishAck>> handler) {
        PromiseInternal promise = this.context.promise();
        try {
            promise.complete(this.jetStream.publish(message));
        } catch (Exception e) {
            handleException(promise, e);
        }
        handler.handle(promise.future());
    }

    @Override // io.nats.vertx.NatsStream
    public Future<Void> subscribe(String str, Handler<NatsVertxMessage> handler, boolean z, PushSubscribeOptions pushSubscribeOptions) {
        PromiseInternal promise = this.context.promise();
        Handler handler2 = message -> {
            handler.handle(new NatsVertxMessage() { // from class: io.nats.vertx.impl.NatsStreamImpl.1
                @Override // io.nats.vertx.NatsVertxMessage
                public Message message() {
                    return message;
                }

                @Override // io.nats.vertx.NatsVertxMessage
                public Vertx vertx() {
                    return NatsStreamImpl.this.vertx;
                }
            });
        };
        this.vertx.runOnContext(r13 -> {
            try {
                Dispatcher createDispatcher = this.connection.createDispatcher();
                this.jetStream.subscribe(str, createDispatcher, message2 -> {
                    handler2.handle(message2);
                }, z, pushSubscribeOptions);
                this.subscriptionMap.put(str, createDispatcher);
                promise.complete();
            } catch (Exception e) {
                promise.fail(e);
                this.exceptionHandler.handle(e);
            }
        });
        return promise.future();
    }

    @Override // io.nats.vertx.NatsStream
    public Future<Void> subscribe(String str, String str2, Handler<NatsVertxMessage> handler, boolean z, PushSubscribeOptions pushSubscribeOptions) {
        PromiseInternal promise = this.context.promise();
        Handler handler2 = message -> {
            handler.handle(new NatsVertxMessage() { // from class: io.nats.vertx.impl.NatsStreamImpl.2
                @Override // io.nats.vertx.NatsVertxMessage
                public Message message() {
                    return message;
                }

                @Override // io.nats.vertx.NatsVertxMessage
                public Vertx vertx() {
                    return NatsStreamImpl.this.vertx;
                }
            });
        };
        this.vertx.runOnContext(r15 -> {
            try {
                Dispatcher createDispatcher = this.connection.createDispatcher();
                this.jetStream.subscribe(str, str2, createDispatcher, message2 -> {
                    handler2.handle(message2);
                }, z, pushSubscribeOptions);
                this.subscriptionMap.put(str, createDispatcher);
                promise.complete();
            } catch (Exception e) {
                promise.fail(e);
                this.exceptionHandler.handle(e);
            }
        });
        return promise.future();
    }

    @Override // io.nats.vertx.NatsStream
    public Future<Void> unsubscribe(String str) {
        PromiseInternal promise = this.context.promise();
        this.vertx.runOnContext(r7 -> {
            try {
                Dispatcher dispatcher = this.subscriptionMap.get(str);
                if (dispatcher == null) {
                    promise.fail("Subscription not found for unsubscribe op: " + str);
                } else {
                    this.connection.closeDispatcher(dispatcher);
                    promise.complete();
                }
            } catch (Exception e) {
                handleException(promise, e);
            }
        });
        return promise.future();
    }

    @Override // io.nats.vertx.NatsStream
    public Future<PublishAck> publish(Message message, PublishOptions publishOptions) {
        PromiseInternal promise = this.context.promise();
        try {
            promise.complete(this.jetStream.publish(message, publishOptions));
        } catch (Exception e) {
            handleException(promise, e);
        }
        return promise.future();
    }

    private void handleException(Promise<?> promise, Exception exc) {
        promise.fail(exc);
        throw new RuntimeException(exc);
    }

    @Override // io.nats.vertx.NatsStream
    /* renamed from: drainHandler */
    public /* bridge */ /* synthetic */ WriteStream mo1drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    public /* bridge */ /* synthetic */ void write(Object obj, Handler handler) {
        write((Message) obj, (Handler<AsyncResult<Void>>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m5exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
