package org.apache.geode.redis.internal.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.DecoderException;
import java.io.IOException;
import java.math.BigInteger;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionInvocationTargetException;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.redis.internal.ParameterRequirements.RedisParametersMismatchException;
import org.apache.geode.redis.internal.RedisCommandType;
import org.apache.geode.redis.internal.RedisConstants;
import org.apache.geode.redis.internal.RegionProvider;
import org.apache.geode.redis.internal.data.RedisDataTypeMismatchException;
import org.apache.geode.redis.internal.executor.CommandFunction;
import org.apache.geode.redis.internal.executor.RedisResponse;
import org.apache.geode.redis.internal.executor.UnknownExecutor;
import org.apache.geode.redis.internal.pubsub.PubSub;
import org.apache.geode.redis.internal.statistics.RedisStats;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/redis/internal/netty/ExecutionHandlerContext.class */
public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogService.getLogger();
    private static final Command TERMINATE_COMMAND = new Command();
    private final Client client;
    private final Channel channel;
    private final RegionProvider regionProvider;
    private final PubSub pubsub;
    private final ByteBufAllocator byteBufAllocator;
    private final byte[] authPassword;
    private final Supplier<Boolean> allowUnsupportedSupplier;
    private final Runnable shutdownInvoker;
    private final RedisStats redisStats;
    private final EventLoopGroup subscriberGroup;
    private BigInteger scanCursor;
    private BigInteger sscanCursor;
    private int hscanCursor;
    private final AtomicBoolean channelInactive = new AtomicBoolean();
    private final int MAX_QUEUED_COMMANDS = Integer.getInteger("geode.redis.commandQueueSize", 1000).intValue();
    private final LinkedBlockingQueue<Command> commandQueue = new LinkedBlockingQueue<>(this.MAX_QUEUED_COMMANDS);
    private final int serverPort;
    private CountDownLatch eventLoopSwitched;
    private boolean isAuthenticated;

    public ExecutionHandlerContext(Channel channel, RegionProvider regionProvider, PubSub pubSub, Supplier<Boolean> supplier, Runnable runnable, RedisStats redisStats, ExecutorService executorService, EventLoopGroup eventLoopGroup, byte[] bArr, int i) {
        this.channel = channel;
        this.regionProvider = regionProvider;
        this.pubsub = pubSub;
        this.allowUnsupportedSupplier = supplier;
        this.shutdownInvoker = runnable;
        this.redisStats = redisStats;
        this.subscriberGroup = eventLoopGroup;
        this.client = new Client(channel);
        this.byteBufAllocator = this.channel.alloc();
        this.authPassword = bArr;
        this.isAuthenticated = bArr == null;
        this.serverPort = i;
        this.scanCursor = new BigInteger("0");
        this.sscanCursor = new BigInteger("0");
        this.hscanCursor = 0;
        redisStats.addClient();
        executorService.submit(this::processCommandQueue);
    }

    public ChannelFuture writeToChannel(RedisResponse redisResponse) {
        return this.channel.writeAndFlush(redisResponse.encode(this.byteBufAllocator), this.channel.newPromise()).addListener(channelFuture -> {
            redisResponse.afterWrite();
            logResponse(redisResponse, this.channel.remoteAddress().toString(), channelFuture.cause());
        });
    }

    private void processCommandQueue() {
        while (true) {
            Command takeCommandFromQueue = takeCommandFromQueue();
            if (takeCommandFromQueue == TERMINATE_COMMAND) {
                return;
            }
            try {
                executeCommand(takeCommandFromQueue);
                this.redisStats.incCommandsProcessed();
            } catch (Throwable th) {
                exceptionCaught(takeCommandFromQueue.getChannelHandlerContext(), th);
            }
        }
    }

    private Command takeCommandFromQueue() {
        try {
            return this.commandQueue.take();
        } catch (InterruptedException e) {
            logger.info("Command queue thread interrupted");
            return TERMINATE_COMMAND;
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        Command command = (Command) obj;
        command.setChannelHandlerContext(channelHandlerContext);
        if (this.channelInactive.get()) {
            return;
        }
        this.commandQueue.put(command);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        RedisResponse exceptionResponse = getExceptionResponse(channelHandlerContext, th);
        if (exceptionResponse != null) {
            writeToChannel(exceptionResponse);
        }
    }

    public EventLoopGroup getSubscriberGroup() {
        return this.subscriberGroup;
    }

    public synchronized void changeChannelEventLoopGroup(EventLoopGroup eventLoopGroup, Consumer<Boolean> consumer) {
        if (eventLoopGroup.equals(this.channel.eventLoop())) {
            consumer.accept(true);
        } else {
            this.channel.deregister().addListener(channelFuture -> {
                boolean z = true;
                synchronized (this.channel) {
                    if (!this.channel.isRegistered()) {
                        try {
                            eventLoopGroup.register(this.channel).sync();
                        } catch (Exception e) {
                            logger.warn("Unable to register new EventLoopGroup: {}", e.getMessage());
                            z = false;
                        }
                    }
                }
                consumer.accept(Boolean.valueOf(z));
            });
        }
    }

    private RedisResponse getExceptionResponse(ChannelHandlerContext channelHandlerContext, Throwable th) {
        RedisResponse error;
        Throwable initialCause;
        if (th instanceof IOException) {
            channelInactive(channelHandlerContext);
            return null;
        }
        if ((th instanceof FunctionException) && !(th instanceof FunctionInvocationTargetException) && (initialCause = CommandFunction.getInitialCause((FunctionException) th)) != null) {
            th = initialCause;
        }
        if (th instanceof NumberFormatException) {
            error = RedisResponse.error(th.getMessage());
        } else if (th instanceof ArithmeticException) {
            error = RedisResponse.error(th.getMessage());
        } else if (th instanceof RedisDataTypeMismatchException) {
            error = RedisResponse.wrongType(th.getMessage());
        } else if (th instanceof LowMemoryException) {
            error = RedisResponse.oom(RedisConstants.ERROR_OOM_COMMAND_NOT_ALLOWED);
        } else if ((th instanceof DecoderException) && (th.getCause() instanceof RedisCommandParserException)) {
            error = RedisResponse.error(RedisConstants.PARSING_EXCEPTION_MESSAGE);
        } else if ((th instanceof InterruptedException) || (th instanceof CacheClosedException)) {
            error = RedisResponse.error(RedisConstants.SERVER_ERROR_SHUTDOWN);
        } else if ((th instanceof IllegalStateException) || (th instanceof RedisParametersMismatchException)) {
            error = RedisResponse.error(th.getMessage());
        } else if ((th instanceof FunctionInvocationTargetException) || (th instanceof DistributedSystemDisconnectedException) || (th instanceof ForcedDisconnectException)) {
            logger.warn("Closing client connection because one of the servers doing this operation departed.");
            channelInactive(channelHandlerContext);
            error = null;
        } else {
            if (logger.isErrorEnabled()) {
                logger.error("GeodeRedisServer-Unexpected error handler for " + channelHandlerContext.channel(), th);
            }
            error = RedisResponse.error(RedisConstants.SERVER_ERROR_MESSAGE);
        }
        return error;
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        if (this.channelInactive.compareAndSet(false, true)) {
            if (logger.isDebugEnabled()) {
                logger.debug("GeodeRedisServer-Connection closing with " + channelHandlerContext.channel().remoteAddress());
            }
            this.commandQueue.clear();
            this.commandQueue.offer(TERMINATE_COMMAND);
            this.redisStats.removeClient();
            channelHandlerContext.channel().close();
            channelHandlerContext.close();
        }
    }

    private void executeCommand(Command command) {
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("Executing Redis command: {} - {}", command, this.channel.remoteAddress().toString());
            }
            if (command.isUnknown()) {
                writeToChannel(command.execute(this));
                return;
            }
            if (!isAuthenticated()) {
                writeToChannel(handleUnAuthenticatedCommand(command));
                return;
            }
            if (command.isUnsupported() && !allowUnsupportedCommands()) {
                writeToChannel(new UnknownExecutor().executeCommand(command, this));
                return;
            }
            if (!getPubSub().findSubscriptionNames(getClient()).isEmpty() && !command.getCommandType().isAllowedWhileSubscribed()) {
                writeToChannel(RedisResponse.error("only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"));
            }
            long startCommand = this.redisStats.startCommand();
            try {
                writeToChannel(command.execute(this));
                this.redisStats.endCommand(command.getCommandType(), startCommand);
                if (command.isOfType(RedisCommandType.QUIT)) {
                    channelInactive(command.getChannelHandlerContext());
                }
            } catch (Throwable th) {
                this.redisStats.endCommand(command.getCommandType(), startCommand);
                throw th;
            }
        } catch (Exception e) {
            logger.warn("Execution of Redis command {} failed: {}", command, e);
            throw e;
        }
    }

    public boolean allowUnsupportedCommands() {
        return this.allowUnsupportedSupplier.get().booleanValue();
    }

    private RedisResponse handleUnAuthenticatedCommand(Command command) {
        return command.isOfType(RedisCommandType.AUTH) ? command.execute(this) : RedisResponse.customError(RedisConstants.ERROR_NOT_AUTH);
    }

    private void logResponse(RedisResponse redisResponse, String str, Throwable th) {
        if (!logger.isDebugEnabled() || redisResponse == null) {
            return;
        }
        ByteBuf encode = redisResponse.encode(new UnpooledByteBufAllocator(false));
        if (th == null) {
            logger.debug("Redis command returned: {} - {}", Command.getHexEncodedString(encode.array(), encode.readableBytes()), str);
        } else {
            logger.debug("Redis command FAILED to return: {} - {}", Command.getHexEncodedString(encode.array(), encode.readableBytes()), str, th);
        }
    }

    public ByteBufAllocator getByteBufAllocator() {
        return this.byteBufAllocator;
    }

    public RegionProvider getRegionProvider() {
        return this.regionProvider;
    }

    public byte[] getAuthPassword() {
        return this.authPassword;
    }

    public boolean isAuthenticated() {
        return this.isAuthenticated;
    }

    public void setAuthenticationVerified() {
        this.isAuthenticated = true;
    }

    public int getServerPort() {
        return this.serverPort;
    }

    public Client getClient() {
        return this.client;
    }

    public UUID getClientUUID() {
        return getClient().getId();
    }

    public void shutdown() {
        this.shutdownInvoker.run();
    }

    public PubSub getPubSub() {
        return this.pubsub;
    }

    public RedisStats getRedisStats() {
        return this.redisStats;
    }

    public BigInteger getScanCursor() {
        return this.scanCursor;
    }

    public void setScanCursor(BigInteger bigInteger) {
        this.scanCursor = bigInteger;
    }

    public BigInteger getSscanCursor() {
        return this.sscanCursor;
    }

    public void setSscanCursor(BigInteger bigInteger) {
        this.sscanCursor = bigInteger;
    }

    public int getHscanCursor() {
        return this.hscanCursor;
    }

    public void setHscanCursor(int i) {
        this.hscanCursor = i;
    }

    public CountDownLatch getOrCreateEventLoopLatch() {
        if (this.eventLoopSwitched != null) {
            return this.eventLoopSwitched;
        }
        this.eventLoopSwitched = new CountDownLatch(1);
        return this.eventLoopSwitched;
    }

    public void eventLoopReady() {
        if (this.eventLoopSwitched == null) {
            return;
        }
        try {
            this.eventLoopSwitched.await();
        } catch (InterruptedException e) {
            logger.info("Event loop interrupted", e);
        }
    }
}
