Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/BackendHandlerFactory.java +7 −5 Original line number Diff line number Diff line Loading @@ -43,32 +43,34 @@ public final class BackendHandlerFactory { * Create new instance of text protocol backend handler. * * @param connectionId connection ID of database connected * @param sequenceId sequence ID of SQL packet * @param sql SQL to be executed * @param backendConnection backend connection * @param databaseType database type * @param rebuilder rebuilder * @return instance of text protocol backend handler */ public static BackendHandler newTextProtocolInstance(final int connectionId, final String sql, public static BackendHandler newTextProtocolInstance(final int connectionId, final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType, final CommandPacketRebuilder rebuilder) { return RULE_REGISTRY.getBackendNIOConfig().isUseNIO() ? new NettyBackendHandler(connectionId, sql, rebuilder, databaseType) : new JDBCBackendHandler(sql, JDBCExecuteEngineFactory.createTextProtocolInstance(backendConnection)); ? new NettyBackendHandler(connectionId, sequenceId, sql, rebuilder, databaseType) : new JDBCBackendHandler(sql, JDBCExecuteEngineFactory.createTextProtocolInstance(backendConnection)); } /** * Create new instance of text protocol backend handler. * * @param sql SQL to be executed * @param connectionId connection ID of database connected * @param sequenceId sequence ID of SQL packet * @param sql SQL to be executed * @param parameters SQL parameters * @param backendConnection backend connection * @param databaseType database type * @param rebuilder rebuilder * @return instance of text protocol backend handler */ public static BackendHandler newBinaryProtocolInstance(final int connectionId, final String sql, final List<Object> parameters, public static BackendHandler newBinaryProtocolInstance(final int connectionId, final int sequenceId, final String sql, final List<Object> parameters, final BackendConnection backendConnection, final DatabaseType databaseType, final CommandPacketRebuilder rebuilder) { return RULE_REGISTRY.getBackendNIOConfig().isUseNIO() ? new NettyBackendHandler(connectionId, sql, rebuilder, databaseType) return RULE_REGISTRY.getBackendNIOConfig().isUseNIO() ? new NettyBackendHandler(connectionId, sequenceId, sql, rebuilder, databaseType) : new JDBCBackendHandler(sql, JDBCExecuteEngineFactory.createBinaryProtocolInstance(parameters, backendConnection)); } } sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/NettyBackendHandler.java +3 −1 Original line number Diff line number Diff line Loading @@ -77,6 +77,8 @@ public final class NettyBackendHandler extends AbstractBackendHandler { private final int connectionId; private final int sequenceId; private final String sql; private final CommandPacketRebuilder rebuilder; Loading Loading @@ -157,7 +159,7 @@ public final class NettyBackendHandler extends AbstractBackendHandler { Channel channel = pool.acquire().get(RULE_REGISTRY.getBackendNIOConfig().getConnectionTimeoutSeconds(), TimeUnit.SECONDS); channelMap.get(dataSourceName).add(channel); ChannelRegistry.getInstance().putConnectionId(channel.id().asShortText(), connectionId); channel.writeAndFlush(rebuilder.rebuild(rebuilder.sequenceId(), connectionId, sql)); channel.writeAndFlush(rebuilder.rebuild(sequenceId, connectionId, sql)); } private CommandResponsePackets merge(final SQLStatement sqlStatement, final List<CommandResponsePackets> packets, final List<QueryResult> queryResults) { Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/common/packet/CommandPacketRebuilder.java +0 −7 Original line number Diff line number Diff line Loading @@ -26,13 +26,6 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.CommandPacket; */ public interface CommandPacketRebuilder { /** * Get sequence id. * * @return sequence id */ int sequenceId(); /** * CommandPacket rebuild by params. * Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/query/binary/execute/ComStmtExecutePacket.java +1 −6 Original line number Diff line number Diff line Loading @@ -90,7 +90,7 @@ public final class ComStmtExecutePacket implements QueryCommandPacket, CommandPa binaryStatement.setParameterTypes(getParameterTypes(payload, parametersCount)); } parameters = getParameters(payload, parametersCount); backendHandler = BackendHandlerFactory.newBinaryProtocolInstance(connectionId, binaryStatement.getSql(), parameters, backendConnection, DatabaseType.MySQL, this); backendHandler = BackendHandlerFactory.newBinaryProtocolInstance(connectionId, sequenceId, binaryStatement.getSql(), parameters, backendConnection, DatabaseType.MySQL, this); } private List<BinaryStatementParameterType> getParameterTypes(final MySQLPacketPayload payload, final int parametersCount) { Loading Loading @@ -147,11 +147,6 @@ public final class ComStmtExecutePacket implements QueryCommandPacket, CommandPa return new BinaryResultSetRowPacket(resultPacket.getSequenceId(), resultPacket.getColumnCount(), resultPacket.getData(), resultPacket.getColumnTypes()); } @Override public int sequenceId() { return getSequenceId(); } @Override public CommandPacket rebuild(final Object... params) { return new ComQueryPacket((int) params[0], (int) params[1], (String) params[2]); Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacket.java +2 −6 Original line number Diff line number Diff line Loading @@ -64,7 +64,8 @@ public final class ComFieldListPacket implements CommandPacket, CommandPacketReb this.sequenceId = sequenceId; table = payload.readStringNul(); fieldWildcard = payload.readStringEOF(); backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, String.format(SQL, table, ShardingConstant.LOGIC_SCHEMA_NAME), backendConnection, DatabaseType.MySQL, this); backendHandler = BackendHandlerFactory.newTextProtocolInstance( connectionId, sequenceId, String.format(SQL, table, ShardingConstant.LOGIC_SCHEMA_NAME), backendConnection, DatabaseType.MySQL, this); } @Override Loading Loading @@ -94,11 +95,6 @@ public final class ComFieldListPacket implements CommandPacket, CommandPacketReb return result; } @Override public int sequenceId() { return getSequenceId(); } @Override public CommandPacket rebuild(final Object... params) { return new ComQueryPacket((int) params[0], (int) params[1], (String) params[2]); Loading Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/BackendHandlerFactory.java +7 −5 Original line number Diff line number Diff line Loading @@ -43,32 +43,34 @@ public final class BackendHandlerFactory { * Create new instance of text protocol backend handler. * * @param connectionId connection ID of database connected * @param sequenceId sequence ID of SQL packet * @param sql SQL to be executed * @param backendConnection backend connection * @param databaseType database type * @param rebuilder rebuilder * @return instance of text protocol backend handler */ public static BackendHandler newTextProtocolInstance(final int connectionId, final String sql, public static BackendHandler newTextProtocolInstance(final int connectionId, final int sequenceId, final String sql, final BackendConnection backendConnection, final DatabaseType databaseType, final CommandPacketRebuilder rebuilder) { return RULE_REGISTRY.getBackendNIOConfig().isUseNIO() ? new NettyBackendHandler(connectionId, sql, rebuilder, databaseType) : new JDBCBackendHandler(sql, JDBCExecuteEngineFactory.createTextProtocolInstance(backendConnection)); ? new NettyBackendHandler(connectionId, sequenceId, sql, rebuilder, databaseType) : new JDBCBackendHandler(sql, JDBCExecuteEngineFactory.createTextProtocolInstance(backendConnection)); } /** * Create new instance of text protocol backend handler. * * @param sql SQL to be executed * @param connectionId connection ID of database connected * @param sequenceId sequence ID of SQL packet * @param sql SQL to be executed * @param parameters SQL parameters * @param backendConnection backend connection * @param databaseType database type * @param rebuilder rebuilder * @return instance of text protocol backend handler */ public static BackendHandler newBinaryProtocolInstance(final int connectionId, final String sql, final List<Object> parameters, public static BackendHandler newBinaryProtocolInstance(final int connectionId, final int sequenceId, final String sql, final List<Object> parameters, final BackendConnection backendConnection, final DatabaseType databaseType, final CommandPacketRebuilder rebuilder) { return RULE_REGISTRY.getBackendNIOConfig().isUseNIO() ? new NettyBackendHandler(connectionId, sql, rebuilder, databaseType) return RULE_REGISTRY.getBackendNIOConfig().isUseNIO() ? new NettyBackendHandler(connectionId, sequenceId, sql, rebuilder, databaseType) : new JDBCBackendHandler(sql, JDBCExecuteEngineFactory.createBinaryProtocolInstance(parameters, backendConnection)); } }
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/NettyBackendHandler.java +3 −1 Original line number Diff line number Diff line Loading @@ -77,6 +77,8 @@ public final class NettyBackendHandler extends AbstractBackendHandler { private final int connectionId; private final int sequenceId; private final String sql; private final CommandPacketRebuilder rebuilder; Loading Loading @@ -157,7 +159,7 @@ public final class NettyBackendHandler extends AbstractBackendHandler { Channel channel = pool.acquire().get(RULE_REGISTRY.getBackendNIOConfig().getConnectionTimeoutSeconds(), TimeUnit.SECONDS); channelMap.get(dataSourceName).add(channel); ChannelRegistry.getInstance().putConnectionId(channel.id().asShortText(), connectionId); channel.writeAndFlush(rebuilder.rebuild(rebuilder.sequenceId(), connectionId, sql)); channel.writeAndFlush(rebuilder.rebuild(sequenceId, connectionId, sql)); } private CommandResponsePackets merge(final SQLStatement sqlStatement, final List<CommandResponsePackets> packets, final List<QueryResult> queryResults) { Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/common/packet/CommandPacketRebuilder.java +0 −7 Original line number Diff line number Diff line Loading @@ -26,13 +26,6 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.CommandPacket; */ public interface CommandPacketRebuilder { /** * Get sequence id. * * @return sequence id */ int sequenceId(); /** * CommandPacket rebuild by params. * Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/query/binary/execute/ComStmtExecutePacket.java +1 −6 Original line number Diff line number Diff line Loading @@ -90,7 +90,7 @@ public final class ComStmtExecutePacket implements QueryCommandPacket, CommandPa binaryStatement.setParameterTypes(getParameterTypes(payload, parametersCount)); } parameters = getParameters(payload, parametersCount); backendHandler = BackendHandlerFactory.newBinaryProtocolInstance(connectionId, binaryStatement.getSql(), parameters, backendConnection, DatabaseType.MySQL, this); backendHandler = BackendHandlerFactory.newBinaryProtocolInstance(connectionId, sequenceId, binaryStatement.getSql(), parameters, backendConnection, DatabaseType.MySQL, this); } private List<BinaryStatementParameterType> getParameterTypes(final MySQLPacketPayload payload, final int parametersCount) { Loading Loading @@ -147,11 +147,6 @@ public final class ComStmtExecutePacket implements QueryCommandPacket, CommandPa return new BinaryResultSetRowPacket(resultPacket.getSequenceId(), resultPacket.getColumnCount(), resultPacket.getData(), resultPacket.getColumnTypes()); } @Override public int sequenceId() { return getSequenceId(); } @Override public CommandPacket rebuild(final Object... params) { return new ComQueryPacket((int) params[0], (int) params[1], (String) params[2]); Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/query/text/fieldlist/ComFieldListPacket.java +2 −6 Original line number Diff line number Diff line Loading @@ -64,7 +64,8 @@ public final class ComFieldListPacket implements CommandPacket, CommandPacketReb this.sequenceId = sequenceId; table = payload.readStringNul(); fieldWildcard = payload.readStringEOF(); backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, String.format(SQL, table, ShardingConstant.LOGIC_SCHEMA_NAME), backendConnection, DatabaseType.MySQL, this); backendHandler = BackendHandlerFactory.newTextProtocolInstance( connectionId, sequenceId, String.format(SQL, table, ShardingConstant.LOGIC_SCHEMA_NAME), backendConnection, DatabaseType.MySQL, this); } @Override Loading Loading @@ -94,11 +95,6 @@ public final class ComFieldListPacket implements CommandPacket, CommandPacketReb return result; } @Override public int sequenceId() { return getSequenceId(); } @Override public CommandPacket rebuild(final Object... params) { return new ComQueryPacket((int) params[0], (int) params[1], (String) params[2]); Loading