package io.vertx.redis.client.impl;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.pool.PoolConnector;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.ResponseType;
import io.vertx.redis.client.impl.types.ErrorType;
import io.vertx.redis.client.impl.types.Multi;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:io/vertx/redis/client/impl/RedisStandaloneConnection.class */
public class RedisStandaloneConnection implements RedisConnection, ParserHandler {
    private static final String BASE_ADDRESS = "io.vertx.redis";
    private static final Logger LOG = LoggerFactory.getLogger(RedisStandaloneConnection.class);
    private static final ErrorType CONNECTION_CLOSED = ErrorType.create("CONNECTION_CLOSED");
    private final PoolConnector.Listener listener;
    private final VertxInternal vertx;
    private final ContextInternal context;
    private final EventBus eventBus;
    private final NetSocket netSocket;
    private final ArrayQueue waiting;
    private Handler<Throwable> onException;
    private Handler<Void> onEnd;
    private Handler<Response> onMessage;
    private Runnable onEvict;
    private boolean isValid = true;

    public RedisStandaloneConnection(Vertx vertx, ContextInternal contextInternal, PoolConnector.Listener listener, NetSocket netSocket, RedisOptions redisOptions) {
        this.vertx = (VertxInternal) vertx;
        this.context = contextInternal;
        this.listener = listener;
        this.eventBus = vertx.eventBus();
        this.netSocket = netSocket;
        this.waiting = new ArrayQueue(redisOptions.getMaxWaitingHandlers());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void forceClose() {
        this.listener.onRemove();
        if (this.onEvict != null) {
            this.onEvict.run();
            this.onEvict = null;
        }
        this.netSocket.close();
    }

    public boolean isValid() {
        return this.isValid;
    }

    @Override // io.vertx.redis.client.RedisConnection
    public void close() {
    }

    @Override // io.vertx.redis.client.RedisConnection
    public boolean pendingQueueFull() {
        return this.waiting.isFull();
    }

    @Override // io.vertx.redis.client.RedisConnection
    public RedisConnection exceptionHandler(Handler<Throwable> handler) {
        this.onException = handler;
        return this;
    }

    @Override // io.vertx.redis.client.RedisConnection
    public RedisConnection endHandler(Handler<Void> handler) {
        this.onEnd = handler;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RedisConnection evictHandler(Runnable runnable) {
        this.onEvict = runnable;
        return this;
    }

    @Override // io.vertx.redis.client.RedisConnection
    public RedisConnection handler(Handler<Response> handler) {
        this.onMessage = handler;
        return this;
    }

    @Override // io.vertx.redis.client.RedisConnection
    /* renamed from: pause */
    public RedisConnection mo6pause() {
        this.netSocket.pause();
        return this;
    }

    @Override // io.vertx.redis.client.RedisConnection
    /* renamed from: resume */
    public RedisConnection mo5resume() {
        this.netSocket.resume();
        return this;
    }

    @Override // io.vertx.redis.client.RedisConnection
    /* renamed from: fetch */
    public RedisConnection mo4fetch(long j) {
        return this;
    }

    @Override // io.vertx.redis.client.RedisConnection
    public Future<Response> send(Request request) {
        PromiseInternal promise = this.vertx.promise();
        boolean isVoid = request.command().isVoid();
        if (!isVoid && this.waiting.isFull()) {
            promise.fail("Redis waiting Queue is full");
            return promise.future();
        }
        Buffer encode = ((RequestImpl) request).encode();
        this.context.execute(r10 -> {
            if (!isVoid) {
                if (this.waiting.isFull()) {
                    promise.fail("Redis waiting Queue is full");
                    return;
                }
                this.waiting.offer(promise);
            }
            this.netSocket.write(encode, asyncResult -> {
                if (asyncResult.failed()) {
                    fatal(asyncResult.cause());
                } else if (isVoid) {
                    promise.complete();
                }
            });
        });
        return promise.future();
    }

    @Override // io.vertx.redis.client.RedisConnection
    public Future<List<Response>> batch(List<Request> list) {
        PromiseInternal promise = this.vertx.promise();
        if (list.isEmpty()) {
            LOG.debug("Empty batch");
            promise.complete(Collections.emptyList());
        } else {
            if (this.waiting.freeSlots() < list.size()) {
                promise.fail("Redis waiting Queue is full");
                return promise.future();
            }
            ArrayList arrayList = new ArrayList(list.size());
            ArrayList arrayList2 = new ArrayList(list.size());
            AtomicInteger atomicInteger = new AtomicInteger(list.size());
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Buffer buffer = Buffer.buffer();
            for (int i = 0; i < list.size(); i++) {
                int i2 = i;
                ((RequestImpl) list.get(i2)).encode(buffer);
                arrayList.add(i2, this.vertx.promise(asyncResult -> {
                    if (!atomicBoolean.get() && asyncResult.failed()) {
                        atomicBoolean.set(true);
                        promise.fail(asyncResult.cause());
                    } else {
                        arrayList2.add(i2, asyncResult.result());
                        if (atomicInteger.decrementAndGet() == 0) {
                            promise.complete(arrayList2);
                        }
                    }
                }));
            }
            this.context.execute(r8 -> {
                if (this.waiting.freeSlots() < arrayList.size()) {
                    promise.fail("Redis waiting Queue is full");
                    return;
                }
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    this.waiting.offer((Promise) it.next());
                }
                this.netSocket.write(buffer, asyncResult2 -> {
                    if (asyncResult2.failed()) {
                        fatal(asyncResult2.cause());
                    }
                });
            });
        }
        return promise.future();
    }

    @Override // io.vertx.redis.client.impl.ParserHandler
    public void handle(Response response) {
        if ((response == null || response.type() != ResponseType.PUSH) && !this.waiting.isEmpty()) {
            this.context.execute(r6 -> {
                Promise promise = (Promise) this.waiting.poll();
                if (promise == null) {
                    LOG.error("No handler waiting for message: " + response);
                    return;
                }
                if (response == null) {
                    try {
                        promise.complete();
                        return;
                    } catch (RuntimeException e) {
                        fail(e);
                        return;
                    }
                }
                if (response.type() == ResponseType.ERROR) {
                    try {
                        promise.fail((ErrorType) response);
                        return;
                    } catch (RuntimeException e2) {
                        fail(e2);
                        return;
                    }
                }
                try {
                    promise.complete(response);
                } catch (RuntimeException e3) {
                    fail(e3);
                }
            });
            return;
        }
        if (this.onMessage != null) {
            this.context.execute(response, this.onMessage);
            return;
        }
        if (response instanceof Multi) {
            if (response.size() == 3 && "message".equals(response.get(0).toString())) {
                this.eventBus.send("io.vertx.redis." + response.get(1).toString(), new JsonObject().put("status", "OK").put("value", new JsonObject().put("channel", response.get(1).toString()).put("message", response.get(2).toString())));
                return;
            } else if (response.size() == 4 && "pmessage".equals(response.get(0).toString())) {
                this.eventBus.send("io.vertx.redis." + response.get(1).toString(), new JsonObject().put("status", "OK").put("value", new JsonObject().put("pattern", response.get(1).toString()).put("channel", response.get(2).toString()).put("message", response.get(3).toString())));
                return;
            }
        }
        LOG.warn("No handler waiting for message: " + response);
    }

    public void end(Void r5) {
        cleanupQueue(CONNECTION_CLOSED);
        evict();
        if (this.onEnd != null) {
            this.context.execute(r5, this.onEnd);
        }
    }

    @Override // io.vertx.redis.client.impl.ParserHandler
    public void fail(Throwable th) {
        evict();
        if (this.onException != null) {
            this.context.execute(th, this.onException);
        }
        this.isValid = false;
    }

    @Override // io.vertx.redis.client.impl.ParserHandler
    public void fatal(Throwable th) {
        cleanupQueue(th);
        evict();
        if (this.onException != null) {
            this.context.execute(th, this.onException);
        }
        this.isValid = false;
    }

    private void evict() {
        this.listener.onRemove();
        if (this.onEvict != null) {
            this.onEvict.run();
        }
    }

    private void cleanupQueue(Throwable th) {
        this.context.execute(r6 -> {
            while (true) {
                Promise promise = (Promise) this.waiting.poll();
                if (promise == null) {
                    return;
                }
                if (th != null) {
                    try {
                        promise.fail(th);
                    } catch (RuntimeException e) {
                        LOG.warn("Exception during cleanup", e);
                    }
                }
            }
        });
    }

    @Override // io.vertx.redis.client.RedisConnection
    /* renamed from: endHandler */
    public /* bridge */ /* synthetic */ ReadStream mo3endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.redis.client.RedisConnection
    /* renamed from: handler */
    public /* bridge */ /* synthetic */ ReadStream mo7handler(Handler handler) {
        return handler((Handler<Response>) handler);
    }

    @Override // io.vertx.redis.client.RedisConnection
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ ReadStream mo8exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.redis.client.RedisConnection
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo9exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
