/*
 * Decompiled with CFR 0.152.
 */
package io.nats.vertx.impl;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.PublishOptions;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.PublishAck;
import io.nats.vertx.NatsStream;
import io.nats.vertx.NatsVertxMessage;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.streams.WriteStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

public class NatsStreamImpl
implements NatsStream {
    private final JetStream jetStream;
    private final Vertx vertx;
    private final ConcurrentHashMap<String, Dispatcher> dispatcherMap = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, JetStreamSubscription> subscriptionMap = new ConcurrentHashMap();
    private final Connection connection;
    private final AtomicReference<Handler<Throwable>> exceptionHandler = new AtomicReference();

    public NatsStreamImpl(JetStream jetStream, Connection connection, Vertx vertx, Handler<Throwable> exceptionHandler) {
        this.connection = connection;
        this.jetStream = jetStream;
        this.vertx = vertx;
        this.exceptionHandler.set(exceptionHandler);
    }

    private ContextInternal context() {
        return (ContextInternal)this.vertx.getOrCreateContext();
    }

    public WriteStream<Message> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler.set(handler);
        return this;
    }

    public Future<Void> write(Message data) {
        PromiseInternal promise = this.context().promise();
        this.doPublish(data, (Promise<Void>)promise);
        return promise.future();
    }

    public void write(Message data, Handler<AsyncResult<Void>> handler) {
        PromiseInternal promise = this.context().promise();
        this.doPublish(data, (Promise<Void>)promise);
        handler.handle((Object)promise.future());
    }

    private void doPublish(Message data, Promise<Void> promise) {
        try {
            PublishAck publish = this.jetStream.publish(data);
            if (publish.isDuplicate()) {
                promise.fail("Duplicate message " + publish);
            } else if (publish.hasError()) {
                promise.fail(publish.getError() + " " + publish);
            } else {
                promise.complete();
            }
        }
        catch (Exception e) {
            this.handleException(promise, e);
        }
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        PromiseInternal promise = this.context().promise();
        handler.handle((Object)promise.future());
    }

    public WriteStream<Message> setWriteQueueMaxSize(int maxSize) {
        return this;
    }

    public boolean writeQueueFull() {
        return false;
    }

    @Override
    public NatsStream drainHandler(Handler<Void> handler) {
        return this;
    }

    @Override
    public Future<PublishAck> publish(Message data) {
        PromiseInternal promise = this.context().promise();
        this.context().executeBlocking(arg_0 -> this.lambda$publish$0(data, (Promise)promise, arg_0), false);
        return promise.future();
    }

    @Override
    public Future<PublishAck> publish(String subject, String message) {
        return this.publish(subject, message.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public Future<PublishAck> publish(String subject, byte[] message) {
        PromiseInternal promise = this.context().promise();
        this.context().executeBlocking(arg_0 -> this.lambda$publish$1(subject, message, (Promise)promise, arg_0), false);
        return promise.future();
    }

    @Override
    public void publish(Message data, Handler<AsyncResult<PublishAck>> handler) {
        PromiseInternal promise = this.context().promise();
        this.context().executeBlocking(arg_0 -> this.lambda$publish$2(data, (Promise)promise, arg_0), false);
        handler.handle((Object)promise.future());
    }

    @Override
    public Future<PublishAck> publish(Message data, PublishOptions options) {
        PromiseInternal promise = this.context().promise();
        this.context().executeBlocking(arg_0 -> this.lambda$publish$3(data, options, (Promise)promise, arg_0), false);
        return promise.future();
    }

    @Override
    public Future<Void> subscribe(String subject, Handler<NatsVertxMessage> handler, boolean autoAck, PushSubscribeOptions so) {
        PromiseInternal promise = this.context().promise();
        Handler handlerWrapper = event -> handler.handle((Object)new NatsVertxMessage((Message)event){
            final /* synthetic */ Message val$event;
            {
                this.val$event = message;
            }

            @Override
            public Message message() {
                return this.val$event;
            }

            @Override
            public Vertx vertx() {
                return NatsStreamImpl.this.vertx;
            }
        });
        this.context().executeBlocking(arg_0 -> this.lambda$subscribe$6(subject, handlerWrapper, autoAck, so, (Promise)promise, arg_0), false);
        return promise.future();
    }

    @Override
    public Future<Void> subscribe(String subject, String queue, Handler<NatsVertxMessage> handler, boolean autoAck, PushSubscribeOptions so) {
        PromiseInternal promise = this.context().promise();
        Handler handlerWrapper = event -> handler.handle((Object)new NatsVertxMessage((Message)event){
            final /* synthetic */ Message val$event;
            {
                this.val$event = message;
            }

            @Override
            public Message message() {
                return this.val$event;
            }

            @Override
            public Vertx vertx() {
                return NatsStreamImpl.this.vertx;
            }
        });
        this.context().executeBlocking(arg_0 -> this.lambda$subscribe$9(subject, queue, handlerWrapper, autoAck, so, (Promise)promise, arg_0), false);
        return promise.future();
    }

    @Override
    public Future<Void> subscribe(String subject, Handler<NatsVertxMessage> handler, PullSubscribeOptions so, int batchSize) {
        PromiseInternal promise = this.context().promise();
        this.context().executeBlocking(arg_0 -> this.lambda$subscribe$11(so, subject, batchSize, handler, (Promise)promise, arg_0), false);
        return promise.future();
    }

    @Override
    public Future<Void> subscribe(String subject, Handler<NatsVertxMessage> handler) {
        PromiseInternal promise = this.context().promise();
        this.context().executeBlocking(arg_0 -> this.lambda$subscribe$13(subject, handler, (Promise)promise, arg_0), false);
        return promise.future();
    }

    @Override
    public Future<Void> subscribeBatch(String subject, Handler<List<NatsVertxMessage>> handler, int batchSize, Duration batchDuration, PullSubscribeOptions so) {
        PromiseInternal promise = this.context().promise();
        this.context().executeBlocking(arg_0 -> this.lambda$subscribeBatch$16(subject, so, batchSize, batchDuration, handler, (Promise)promise, arg_0), false);
        return promise.future();
    }

    @Override
    public Future<Void> unsubscribe(String subject) {
        PromiseInternal promise = this.context().promise();
        this.context().executeBlocking(arg_0 -> this.lambda$unsubscribe$17(subject, (Promise)promise, arg_0), false);
        return promise.future();
    }

    private void handleException(Promise<?> promise, Exception e) {
        promise.fail((Throwable)e);
        this.exceptionHandler.get().handle((Object)e);
    }

    private /* synthetic */ void lambda$unsubscribe$17(String subject, Promise promise, Promise event) {
        try {
            Dispatcher dispatcher = this.dispatcherMap.get(subject);
            if (dispatcher == null) {
                JetStreamSubscription subscription = this.subscriptionMap.get(subject);
                if (subscription == null) {
                    promise.fail("Subscription not found for unsubscribe op: " + subject);
                } else {
                    subscription.unsubscribe();
                    this.subscriptionMap.remove(subject);
                }
            } else {
                this.dispatcherMap.remove(subject);
                this.connection.closeDispatcher(dispatcher);
                promise.complete();
            }
        }
        catch (Exception e) {
            this.handleException(promise, e);
        }
    }

    private /* synthetic */ void lambda$subscribeBatch$16(String subject, PullSubscribeOptions so, int batchSize, Duration batchDuration, Handler handler, Promise promise, Promise evt) {
        try {
            JetStreamSubscription subscription = this.jetStream.subscribe(subject, so);
            this.subscriptionMap.put(subject, subscription);
            this.context().executeBlocking(event -> {
                try {
                    while (true) {
                        List messages;
                        if ((messages = subscription.fetch(batchSize, batchDuration)) != null && !messages.isEmpty()) {
                            List natsVertxMessages = messages.stream().map(m -> new NatsVertxMessage((Message)m){
                                final /* synthetic */ Message val$m;
                                {
                                    this.val$m = message;
                                }

                                @Override
                                public Message message() {
                                    return this.val$m;
                                }

                                @Override
                                public Vertx vertx() {
                                    return NatsStreamImpl.this.vertx;
                                }
                            }).collect(Collectors.toList());
                            handler.handle(natsVertxMessages);
                            continue;
                        }
                        if (!this.subscriptionMap.containsKey(subject)) break;
                    }
                }
                catch (Exception e) {
                    promise.fail((Throwable)e);
                    this.exceptionHandler.get().handle((Object)e);
                }
            }, false);
            promise.complete();
        }
        catch (Exception e) {
            this.handleException(promise, e);
        }
    }

    private /* synthetic */ void lambda$subscribe$13(String subject, Handler handler, Promise promise, Promise evt) {
        try {
            JetStreamSubscription subscription = this.jetStream.subscribe(subject);
            this.subscriptionMap.put(subject, subscription);
            this.context().executeBlocking(event -> {
                try {
                    while (true) {
                        Message message;
                        if ((message = subscription.nextMessage(Duration.ofMillis(10L))) != null) {
                            NatsVertxMessage nvMessage = new NatsVertxMessage(){

                                @Override
                                public Message message() {
                                    return message;
                                }

                                @Override
                                public Vertx vertx() {
                                    return NatsStreamImpl.this.vertx;
                                }
                            };
                            handler.handle((Object)nvMessage);
                            continue;
                        }
                        if (!this.subscriptionMap.containsKey(subject)) break;
                    }
                }
                catch (Exception e) {
                    promise.fail((Throwable)e);
                    this.exceptionHandler.get().handle((Object)e);
                }
            }, false);
            promise.complete();
        }
        catch (Exception e) {
            this.handleException(promise, e);
        }
    }

    private /* synthetic */ void lambda$subscribe$11(PullSubscribeOptions so, String subject, int batchSize, Handler handler, Promise promise, Promise evt) {
        try {
            JetStreamSubscription subscription = so != null ? this.jetStream.subscribe(subject, so) : this.jetStream.subscribe(subject);
            this.subscriptionMap.put(subject, subscription);
            this.context().executeBlocking(event -> {
                try {
                    int messageCount = 0;
                    while (true) {
                        if (messageCount % batchSize == 0) {
                            subscription.pull(batchSize);
                        }
                        ++messageCount;
                        final Message message = subscription.nextMessage(Duration.ofMillis(10L));
                        if (message != null) {
                            NatsVertxMessage nvMessage = new NatsVertxMessage(){

                                @Override
                                public Message message() {
                                    return message;
                                }

                                @Override
                                public Vertx vertx() {
                                    return NatsStreamImpl.this.vertx;
                                }
                            };
                            handler.handle((Object)nvMessage);
                            continue;
                        }
                        if (!this.subscriptionMap.containsKey(subject)) break;
                    }
                }
                catch (Exception e) {
                    promise.fail((Throwable)e);
                    this.exceptionHandler.get().handle((Object)e);
                }
            }, false);
            promise.complete();
        }
        catch (Exception e) {
            this.handleException(promise, e);
        }
    }

    private /* synthetic */ void lambda$subscribe$9(String subject, String queue, Handler handlerWrapper, boolean autoAck, PushSubscribeOptions so, Promise promise, Promise event) {
        try {
            Dispatcher dispatcher = this.connection.createDispatcher();
            this.jetStream.subscribe(subject, queue, dispatcher, msg -> handlerWrapper.handle((Object)msg), autoAck, so);
            this.dispatcherMap.put(subject, dispatcher);
            promise.complete();
        }
        catch (Exception e) {
            this.handleException(promise, e);
        }
    }

    private /* synthetic */ void lambda$subscribe$6(String subject, Handler handlerWrapper, boolean autoAck, PushSubscribeOptions so, Promise promise, Promise event) {
        try {
            Dispatcher dispatcher = this.connection.createDispatcher();
            JetStreamSubscription subscribe = this.jetStream.subscribe(subject, dispatcher, msg -> handlerWrapper.handle((Object)msg), autoAck, so);
            this.dispatcherMap.put(subject, dispatcher);
            promise.complete();
        }
        catch (Exception e) {
            this.handleException(promise, e);
        }
    }

    private /* synthetic */ void lambda$publish$3(Message data, PublishOptions options, Promise promise, Promise event) {
        try {
            PublishAck ack = this.jetStream.publish(data, options);
            promise.complete((Object)ack);
        }
        catch (Exception e) {
            this.handleException(promise, e);
        }
    }

    private /* synthetic */ void lambda$publish$2(Message data, Promise promise, Promise event) {
        try {
            PublishAck ack = this.jetStream.publish(data);
            promise.complete((Object)ack);
        }
        catch (Exception e) {
            this.handleException(promise, e);
        }
    }

    private /* synthetic */ void lambda$publish$1(String subject, byte[] message, Promise promise, Promise event) {
        try {
            PublishAck ack = this.jetStream.publish(subject, message);
            promise.complete((Object)ack);
        }
        catch (Exception e) {
            this.handleException(promise, e);
        }
    }

    private /* synthetic */ void lambda$publish$0(Message data, Promise promise, Promise event) {
        try {
            PublishAck ack = this.jetStream.publish(data);
            promise.complete((Object)ack);
        }
        catch (Exception e) {
            this.handleException(promise, e);
        }
    }
}

