package org.apache.shardingsphere.proxy.frontend.netty;

import com.google.common.hash.Hashing;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.sql.SQLException;
import java.util.Objects;
import java.util.Optional;
import lombok.Generated;
import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
import org.apache.shardingsphere.authority.rule.AuthorityRule;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginFailedException;
import org.apache.shardingsphere.data.pipeline.cdc.exception.EmptyCDCLoginRequestBodyException;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import org.apache.shardingsphere.infra.exception.dialect.exception.syntax.database.UnknownDatabaseException;
import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.MissingRequiredRuleException;
import org.apache.shardingsphere.infra.exception.mysql.exception.AccessDeniedException;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.frontend.protocol.FrontDatabaseProtocolTypeFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.class */
public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(CDCChannelInboundHandler.class);
    private static final AttributeKey<CDCConnectionContext> CONNECTION_CONTEXT_KEY = AttributeKey.valueOf("connection.context");
    private final CDCBackendHandler backendHandler = new CDCBackendHandler();

    /* renamed from: org.apache.shardingsphere.proxy.frontend.netty.CDCChannelInboundHandler$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$shardingsphere$data$pipeline$cdc$protocol$request$CDCRequest$Type = new int[CDCRequest.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$shardingsphere$data$pipeline$cdc$protocol$request$CDCRequest$Type[CDCRequest.Type.STREAM_DATA.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$shardingsphere$data$pipeline$cdc$protocol$request$CDCRequest$Type[CDCRequest.Type.ACK_STREAMING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$shardingsphere$data$pipeline$cdc$protocol$request$CDCRequest$Type[CDCRequest.Type.STOP_STREAMING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$shardingsphere$data$pipeline$cdc$protocol$request$CDCRequest$Type[CDCRequest.Type.START_STREAMING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$shardingsphere$data$pipeline$cdc$protocol$request$CDCRequest$Type[CDCRequest.Type.DROP_STREAMING.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.writeAndFlush(CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion("5.5.0").setProtocolVersion("1").build()).setStatus(CDCResponse.Status.SUCCEED).build());
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        CDCConnectionContext cDCConnectionContext = (CDCConnectionContext) channelHandlerContext.channel().attr(CONNECTION_CONTEXT_KEY).get();
        if (null != cDCConnectionContext && null != cDCConnectionContext.getJobId()) {
            this.backendHandler.stopStreaming(cDCConnectionContext.getJobId(), channelHandlerContext.channel().id());
        }
        channelHandlerContext.channel().attr(CONNECTION_CONTEXT_KEY).set((Object) null);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        ChannelFuture writeAndFlush;
        log.error("caught CDC resolution error", th);
        if (th instanceof CDCExceptionWrapper) {
            CDCExceptionWrapper cDCExceptionWrapper = (CDCExceptionWrapper) th;
            SQLException sQLException = SQLExceptionTransformEngine.toSQLException(cDCExceptionWrapper.getCause(), FrontDatabaseProtocolTypeFactory.getDatabaseType());
            writeAndFlush = channelHandlerContext.writeAndFlush(CDCResponseUtils.failed(cDCExceptionWrapper.getRequestId(), sQLException.getSQLState(), sQLException.getMessage()));
        } else {
            writeAndFlush = channelHandlerContext.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(th.getMessage())));
        }
        if (null == ((CDCConnectionContext) channelHandlerContext.channel().attr(CONNECTION_CONTEXT_KEY).get())) {
            writeAndFlush.addListener(ChannelFutureListener.CLOSE);
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        CDCConnectionContext cDCConnectionContext = (CDCConnectionContext) channelHandlerContext.channel().attr(CONNECTION_CONTEXT_KEY).get();
        CDCRequest cDCRequest = (CDCRequest) obj;
        if (null == cDCConnectionContext || cDCRequest.hasLoginRequestBody()) {
            processLogin(channelHandlerContext, cDCRequest);
            return;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$shardingsphere$data$pipeline$cdc$protocol$request$CDCRequest$Type[cDCRequest.getType().ordinal()]) {
            case 1:
                processStreamDataRequest(channelHandlerContext, cDCRequest, cDCConnectionContext);
                return;
            case 2:
                processAckStreamingRequest(cDCRequest);
                return;
            case 3:
                processStopStreamingRequest(channelHandlerContext, cDCRequest, cDCConnectionContext);
                return;
            case 4:
                processStartStreamingRequest(channelHandlerContext, cDCRequest, cDCConnectionContext);
                return;
            case 5:
                processDropStreamingRequest(channelHandlerContext, cDCRequest, cDCConnectionContext);
                return;
            default:
                log.warn("can't handle this type of request {}", cDCRequest);
                return;
        }
    }

    private void processLogin(ChannelHandlerContext channelHandlerContext, CDCRequest cDCRequest) {
        ShardingSpherePreconditions.checkState(cDCRequest.hasLoginRequestBody() && cDCRequest.getLoginRequestBody().hasBasicBody(), () -> {
            return new CDCExceptionWrapper(cDCRequest.getRequestId(), new EmptyCDCLoginRequestBodyException());
        });
        LoginRequestBody.BasicBody basicBody = cDCRequest.getLoginRequestBody().getBasicBody();
        Optional findUser = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class).findUser(new Grantee(basicBody.getUsername(), getHostAddress(channelHandlerContext)));
        if (!findUser.isPresent() || !Objects.equals(Hashing.sha256().hashBytes(((ShardingSphereUser) findUser.get()).getPassword().getBytes()).toString().toUpperCase(), basicBody.getPassword())) {
            throw new CDCExceptionWrapper(cDCRequest.getRequestId(), new CDCLoginFailedException());
        }
        channelHandlerContext.channel().attr(CONNECTION_CONTEXT_KEY).set(new CDCConnectionContext((ShardingSphereUser) findUser.get()));
        channelHandlerContext.writeAndFlush(CDCResponseUtils.succeed(cDCRequest.getRequestId()));
    }

    private void checkPrivileges(String str, Grantee grantee, String str2) {
        ShardingSpherePreconditions.checkState(((ShardingSpherePrivileges) ((AuthorityRule) ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(AuthorityRule.class).orElseThrow(() -> {
            return new CDCExceptionWrapper(str, new MissingRequiredRuleException("authority"));
        })).findPrivileges(grantee).orElseThrow(() -> {
            return new CDCExceptionWrapper(str, new AccessDeniedException(grantee.getUsername(), grantee.getHostname(), false));
        })).hasPrivileges(str2), () -> {
            return new CDCExceptionWrapper(str, new UnknownDatabaseException(str2));
        });
    }

    private String getHostAddress(ChannelHandlerContext channelHandlerContext) {
        SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
        return remoteAddress instanceof InetSocketAddress ? ((InetSocketAddress) remoteAddress).getAddress().getHostAddress() : remoteAddress.toString();
    }

    private void processStreamDataRequest(ChannelHandlerContext channelHandlerContext, CDCRequest cDCRequest, CDCConnectionContext cDCConnectionContext) {
        if (!cDCRequest.hasStreamDataRequestBody()) {
            throw new CDCExceptionWrapper(cDCRequest.getRequestId(), new PipelineInvalidParameterException("Stream data request body is empty"));
        }
        StreamDataRequestBody streamDataRequestBody = cDCRequest.getStreamDataRequestBody();
        if (streamDataRequestBody.getDatabase().isEmpty()) {
            throw new CDCExceptionWrapper(cDCRequest.getRequestId(), new PipelineInvalidParameterException("Database is empty"));
        }
        if (streamDataRequestBody.getSourceSchemaTableList().isEmpty()) {
            throw new CDCExceptionWrapper(cDCRequest.getRequestId(), new PipelineInvalidParameterException("Source schema table is empty"));
        }
        checkPrivileges(cDCRequest.getRequestId(), cDCConnectionContext.getCurrentUser().getGrantee(), streamDataRequestBody.getDatabase());
        try {
            channelHandlerContext.writeAndFlush(this.backendHandler.streamData(cDCRequest.getRequestId(), streamDataRequestBody, cDCConnectionContext, channelHandlerContext.channel()));
        } catch (PipelineSQLException e) {
            throw new CDCExceptionWrapper(cDCRequest.getRequestId(), e);
        }
    }

    private void processAckStreamingRequest(CDCRequest cDCRequest) {
        if (!cDCRequest.hasAckStreamingRequestBody()) {
            throw new CDCExceptionWrapper(cDCRequest.getRequestId(), new PipelineInvalidParameterException("Ack request body is empty"));
        }
        AckStreamingRequestBody ackStreamingRequestBody = cDCRequest.getAckStreamingRequestBody();
        if (ackStreamingRequestBody.getAckId().isEmpty()) {
            throw new CDCExceptionWrapper(cDCRequest.getRequestId(), new PipelineInvalidParameterException("Ack request is empty"));
        }
        this.backendHandler.processAck(ackStreamingRequestBody);
    }

    private void processStartStreamingRequest(ChannelHandlerContext channelHandlerContext, CDCRequest cDCRequest, CDCConnectionContext cDCConnectionContext) {
        if (!cDCRequest.hasStartStreamingRequestBody()) {
            throw new CDCExceptionWrapper(cDCRequest.getRequestId(), new PipelineInvalidParameterException("Start streaming request body is empty"));
        }
        StartStreamingRequestBody startStreamingRequestBody = cDCRequest.getStartStreamingRequestBody();
        if (startStreamingRequestBody.getStreamingId().isEmpty()) {
            throw new CDCExceptionWrapper(cDCRequest.getRequestId(), new PipelineInvalidParameterException("Streaming id is empty"));
        }
        checkPrivileges(cDCRequest.getRequestId(), cDCConnectionContext.getCurrentUser().getGrantee(), this.backendHandler.getDatabaseNameByJobId(startStreamingRequestBody.getStreamingId()));
        this.backendHandler.startStreaming(startStreamingRequestBody.getStreamingId(), cDCConnectionContext, channelHandlerContext.channel());
        channelHandlerContext.writeAndFlush(CDCResponseUtils.succeed(cDCRequest.getRequestId()));
    }

    private void processStopStreamingRequest(ChannelHandlerContext channelHandlerContext, CDCRequest cDCRequest, CDCConnectionContext cDCConnectionContext) {
        StopStreamingRequestBody stopStreamingRequestBody = cDCRequest.getStopStreamingRequestBody();
        checkPrivileges(cDCRequest.getRequestId(), cDCConnectionContext.getCurrentUser().getGrantee(), this.backendHandler.getDatabaseNameByJobId(stopStreamingRequestBody.getStreamingId()));
        this.backendHandler.stopStreaming(stopStreamingRequestBody.getStreamingId(), channelHandlerContext.channel().id());
        cDCConnectionContext.setJobId((String) null);
        channelHandlerContext.writeAndFlush(CDCResponseUtils.succeed(cDCRequest.getRequestId()));
    }

    private void processDropStreamingRequest(ChannelHandlerContext channelHandlerContext, CDCRequest cDCRequest, CDCConnectionContext cDCConnectionContext) {
        DropStreamingRequestBody dropStreamingRequestBody = cDCRequest.getDropStreamingRequestBody();
        checkPrivileges(cDCRequest.getRequestId(), cDCConnectionContext.getCurrentUser().getGrantee(), this.backendHandler.getDatabaseNameByJobId(dropStreamingRequestBody.getStreamingId()));
        this.backendHandler.dropStreaming(dropStreamingRequestBody.getStreamingId());
        cDCConnectionContext.setJobId((String) null);
        channelHandlerContext.writeAndFlush(CDCResponseUtils.succeed(cDCRequest.getRequestId()));
    }
}
