Commit d054acce authored by terrymanu's avatar terrymanu
Browse files

pull up channelRead & channelInactive to ResponseHandler

parent c0fd00bc
Loading
Loading
Loading
Loading
+1 −23
Original line number Diff line number Diff line
@@ -24,27 +24,5 @@ package io.shardingsphere.proxy.backend.netty.client.response;
 */
public enum AuthType {
    
    /**
     * Database UN_AUTH.
     * 
     */
    UN_AUTH,
    
    /**
     * Database AUTHING.
     * 
     */
    AUTHING,
    
    /**
     * Database AUTH_SUCCESS.
     *
     */
    AUTH_SUCCESS,
    
    /**
     * Database AUTH_FAILED.
     *
     */
    AUTH_FAILED
    UN_AUTH, AUTHING, AUTH_FIN
}
+35 −0
Original line number Diff line number Diff line
@@ -29,8 +29,37 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 */
public abstract class ResponseHandler extends ChannelInboundHandlerAdapter {
    
    private AuthType authType;
    
    @Override
    public void channelRead(final ChannelHandlerContext context, final Object message) {
        ByteBuf byteBuf = (ByteBuf) message;
        int header = getHeader(byteBuf);
        switch (authType) {
            case UN_AUTH:
                auth(context, byteBuf);
                authType = AuthType.AUTHING;
                break;
            case AUTHING:
                authing(context, byteBuf, header);
                authType = AuthType.AUTH_FIN;
                break;
            case AUTH_FIN:
                authSuccess(context, byteBuf, header);
                break;
            default:
                throw new UnsupportedOperationException(authType.name());
        }
    }
    
    protected abstract int getHeader(ByteBuf byteBuf);
    
    protected abstract void auth(ChannelHandlerContext context, ByteBuf byteBuf);
    
    protected abstract void authing(ChannelHandlerContext context, ByteBuf byteBuf, int header);
    
    protected abstract void authSuccess(ChannelHandlerContext context, ByteBuf byteBuf, int header);
    
    protected abstract void eofPacket(ChannelHandlerContext context, ByteBuf byteBuf);
    
    protected abstract void okPacket(ChannelHandlerContext context, ByteBuf byteBuf);
@@ -38,4 +67,10 @@ public abstract class ResponseHandler extends ChannelInboundHandlerAdapter {
    protected abstract void errPacket(ChannelHandlerContext context, ByteBuf byteBuf);
    
    protected abstract void commonPacket(ChannelHandlerContext context, ByteBuf byteBuf);
    
    @Override
    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
        //TODO delete connection map
        super.channelInactive(ctx);
    }
}
+5 −37
Original line number Diff line number Diff line
@@ -22,7 +22,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.proxy.backend.netty.client.response.AuthType;
import io.shardingsphere.proxy.backend.netty.client.response.ResponseHandler;
import io.shardingsphere.proxy.backend.netty.future.FutureRegistry;
import io.shardingsphere.proxy.config.RuleRegistry;
@@ -61,38 +60,14 @@ public final class MySQLResponseHandler extends ResponseHandler {
    
    private final Map<Integer, MySQLQueryResult> resultMap;
    
    private AuthType authType;
    
    public MySQLResponseHandler(final String dataSourceName) {
        dataSourceParameter = RuleRegistry.getInstance().getDataSourceConfigurationMap().get(dataSourceName);
        dataSourceMetaData = RuleRegistry.getInstance().getMetaData().getDataSource().getActualDataSourceMetaData(dataSourceName);
        resultMap = new HashMap<>();
        authType = AuthType.UN_AUTH;
    }
    
    @Override
    public void channelRead(final ChannelHandlerContext context, final Object message) {
        ByteBuf byteBuf = (ByteBuf) message;
        int header = getHeader(byteBuf);
        switch (authType) {
            case UN_AUTH:
                auth(context, byteBuf);
                break;
            case AUTHING:
                authing(context, byteBuf, header);
                break;
            case AUTH_SUCCESS:
                authSuccess(context, byteBuf, header);
                break;
            case AUTH_FAILED:
                log.error("mysql auth failed, cannot handle channel read message");
                break;
            default:
                throw new UnsupportedOperationException(authType.name());
        }
    }
    
    private int getHeader(final ByteBuf byteBuf) {
    protected int getHeader(final ByteBuf byteBuf) {
        MySQLPacketPayload payload = new MySQLPacketPayload(byteBuf);
        payload.getByteBuf().markReaderIndex();
        payload.readInt1();
@@ -112,7 +87,6 @@ public final class MySQLResponseHandler extends ResponseHandler {
            ChannelRegistry.getInstance().putConnectionId(context.channel().id().asShortText(), handshakePacket.getConnectionId());
            context.writeAndFlush(handshakeResponse41Packet);
        }
        authType = AuthType.AUTHING;
    }
    
    private byte[] securePasswordAuthentication(final byte[] password, final byte[] authPluginData) {
@@ -133,17 +107,17 @@ public final class MySQLResponseHandler extends ResponseHandler {
        }
    }
    
    private void authing(final ChannelHandlerContext context, final ByteBuf byteBuf, final int header) {
    @Override
    protected void authing(final ChannelHandlerContext context, final ByteBuf byteBuf, final int header) {
        if (OKPacket.HEADER == header) {
            okPacket(context, byteBuf);
            authType = AuthType.AUTH_SUCCESS;
        } else {
            errPacket(context, byteBuf);
            authType = AuthType.AUTH_FAILED;
        }
    }
    
    private void authSuccess(final ChannelHandlerContext context, final ByteBuf byteBuf, final int header) {
    @Override
    protected void authSuccess(final ChannelHandlerContext context, final ByteBuf byteBuf, final int header) {
        switch (header) {
            case EofPacket.HEADER:
                eofPacket(context, byteBuf);
@@ -221,10 +195,4 @@ public final class MySQLResponseHandler extends ResponseHandler {
            mysqlQueryResult.addTextResultSetRow(new TextResultSetRowPacket(payload, mysqlQueryResult.getColumnCount()));
        }
    }
    
    @Override
    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
        //TODO delete connection map
        super.channelInactive(ctx);
    }
}