Commit 2cc1f26f authored by terrymanu's avatar terrymanu
Browse files

remove connectionId from CommandPacketRebuilder

parent fe0b1ff3
Loading
Loading
Loading
Loading
+11 −6
Original line number Diff line number Diff line
@@ -41,29 +41,34 @@ public final class BackendHandlerFactory {
    
    /**
     * Create new instance of text protocol backend handler.
     * 
     * @param connectionId connection ID of database connected
     * @param sql SQL to be executed
     * @param backendConnection backend connection
     * @param databaseType database type
     * @param rebuilder rebuilder
     * @return instance of text protocol backend handler
     */
    public static BackendHandler newTextProtocolInstance(final String sql, final BackendConnection backendConnection, final DatabaseType databaseType, final CommandPacketRebuilder rebuilder) {
    public static BackendHandler newTextProtocolInstance(final int connectionId, final String sql, 
                                                         final BackendConnection backendConnection, final DatabaseType databaseType, final CommandPacketRebuilder rebuilder) {
        return RULE_REGISTRY.getBackendNIOConfig().isUseNIO()
                ? new NettyBackendHandler(sql, rebuilder, databaseType) : new JDBCBackendHandler(sql, JDBCExecuteEngineFactory.createTextProtocolInstance(backendConnection));
                ? new NettyBackendHandler(connectionId, sql, rebuilder, databaseType) : new JDBCBackendHandler(sql, JDBCExecuteEngineFactory.createTextProtocolInstance(backendConnection));
    }
    
    /**
     * Create new instance of text protocol backend handler.
     * 
     * @param sql SQL to be executed
     * @param connectionId connection ID of database connected
     * @param parameters SQL parameters
     * @param backendConnection backend connection
     * @param databaseType database type
     * @param rebuilder rebuilder
     * @return instance of text protocol backend handler
     */
    public static BackendHandler newBinaryProtocolInstance(
            final String sql, final List<Object> parameters, final BackendConnection backendConnection, final DatabaseType databaseType, final CommandPacketRebuilder rebuilder) {
        return RULE_REGISTRY.getBackendNIOConfig().isUseNIO()
                ? new NettyBackendHandler(sql, rebuilder, databaseType) : new JDBCBackendHandler(sql, JDBCExecuteEngineFactory.createBinaryProtocolInstance(parameters, backendConnection));
    public static BackendHandler newBinaryProtocolInstance(final int connectionId, final String sql, final List<Object> parameters, 
                                                           final BackendConnection backendConnection, final DatabaseType databaseType, final CommandPacketRebuilder rebuilder) {
        return RULE_REGISTRY.getBackendNIOConfig().isUseNIO() ? new NettyBackendHandler(connectionId, sql, rebuilder, databaseType)
                : new JDBCBackendHandler(sql, JDBCExecuteEngineFactory.createBinaryProtocolInstance(parameters, backendConnection));
    }
}
+8 −6
Original line number Diff line number Diff line
@@ -75,6 +75,8 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
    
    private static final RuleRegistry RULE_REGISTRY = RuleRegistry.getInstance();
    
    private final int connectionId;
    
    private final String sql;
    
    private final CommandPacketRebuilder rebuilder;
@@ -99,10 +101,10 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
    private CommandResponsePackets executeForMasterSlave() throws InterruptedException, ExecutionException, TimeoutException {
        String dataSourceName = new MasterSlaveRouter(RULE_REGISTRY.getMasterSlaveRule(), RULE_REGISTRY.isShowSQL()).route(sql).iterator().next();
        synchronizedFuture = new SynchronizedFuture(1);
        MySQLResultCache.getInstance().putFuture(rebuilder.connectionId(), synchronizedFuture);
        MySQLResultCache.getInstance().putFuture(connectionId, synchronizedFuture);
        executeCommand(dataSourceName, sql);
        List<QueryResult> queryResults = synchronizedFuture.get(RULE_REGISTRY.getBackendNIOConfig().getConnectionTimeoutSeconds(), TimeUnit.SECONDS);
        MySQLResultCache.getInstance().deleteFuture(rebuilder.connectionId());
        MySQLResultCache.getInstance().deleteFuture(connectionId);
        List<CommandResponsePackets> packets = new LinkedList<>();
        for (QueryResult each : queryResults) {
            packets.add(((MySQLQueryResult) each).getCommandResponsePackets());
@@ -118,12 +120,12 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
            return new CommandResponsePackets(new OKPacket(1));
        }
        synchronizedFuture = new SynchronizedFuture(routeResult.getExecutionUnits().size());
        MySQLResultCache.getInstance().putFuture(rebuilder.connectionId(), synchronizedFuture);
        MySQLResultCache.getInstance().putFuture(connectionId, synchronizedFuture);
        for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
            executeCommand(each.getDataSource(), each.getSqlUnit().getSql());
        }
        List<QueryResult> queryResults = synchronizedFuture.get(RULE_REGISTRY.getBackendNIOConfig().getConnectionTimeoutSeconds(), TimeUnit.SECONDS);
        MySQLResultCache.getInstance().deleteFuture(rebuilder.connectionId());
        MySQLResultCache.getInstance().deleteFuture(connectionId);
        
        List<CommandResponsePackets> packets = Lists.newArrayListWithCapacity(queryResults.size());
        for (QueryResult each : queryResults) {
@@ -154,8 +156,8 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
        SimpleChannelPool pool = ShardingProxyClient.getInstance().getPoolMap().get(dataSourceName);
        Channel channel = pool.acquire().get(RULE_REGISTRY.getBackendNIOConfig().getConnectionTimeoutSeconds(), TimeUnit.SECONDS);
        channelMap.get(dataSourceName).add(channel);
        ChannelRegistry.getInstance().putConnectionId(channel.id().asShortText(), rebuilder.connectionId());
        channel.writeAndFlush(rebuilder.rebuild(rebuilder.sequenceId(), rebuilder.connectionId(), sql));
        ChannelRegistry.getInstance().putConnectionId(channel.id().asShortText(), connectionId);
        channel.writeAndFlush(rebuilder.rebuild(rebuilder.sequenceId(), connectionId, sql));
    }
    
    private CommandResponsePackets merge(final SQLStatement sqlStatement, final List<CommandResponsePackets> packets, final List<QueryResult> queryResults) {
+0 −7
Original line number Diff line number Diff line
@@ -26,13 +26,6 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.CommandPacket;
 */
public interface CommandPacketRebuilder {
    
    /**
     * Get connection id.
     * 
     * @return connection id
     */
    int connectionId();
    
    /**
     * Get sequence id.
     * 
+1 −9
Original line number Diff line number Diff line
@@ -60,8 +60,6 @@ public final class ComStmtExecutePacket implements QueryCommandPacket, CommandPa
    @Getter
    private final int sequenceId;
    
    private final int connectionId;
    
    private final int statementId;
    
    private final BinaryStatement binaryStatement;
@@ -78,7 +76,6 @@ public final class ComStmtExecutePacket implements QueryCommandPacket, CommandPa
    
    public ComStmtExecutePacket(final int sequenceId, final int connectionId, final MySQLPacketPayload payload, final BackendConnection backendConnection) {
        this.sequenceId = sequenceId;
        this.connectionId = connectionId;
        statementId = payload.readInt4();
        binaryStatement = BinaryStatementRegistry.getInstance().getBinaryStatement(statementId);
        flags = payload.readInt1();
@@ -93,7 +90,7 @@ public final class ComStmtExecutePacket implements QueryCommandPacket, CommandPa
            binaryStatement.setParameterTypes(getParameterTypes(payload, parametersCount));
        }
        parameters = getParameters(payload, parametersCount);
        backendHandler = BackendHandlerFactory.newBinaryProtocolInstance(binaryStatement.getSql(), parameters, backendConnection, DatabaseType.MySQL, this);
        backendHandler = BackendHandlerFactory.newBinaryProtocolInstance(connectionId, binaryStatement.getSql(), parameters, backendConnection, DatabaseType.MySQL, this);
    }
    
    private List<BinaryStatementParameterType> getParameterTypes(final MySQLPacketPayload payload, final int parametersCount) {
@@ -150,11 +147,6 @@ public final class ComStmtExecutePacket implements QueryCommandPacket, CommandPa
        return new BinaryResultSetRowPacket(resultPacket.getSequenceId(), resultPacket.getColumnCount(), resultPacket.getData(), resultPacket.getColumnTypes());
    }
    
    @Override
    public int connectionId() {
        return connectionId;
    }
    
    @Override
    public int sequenceId() {
        return getSequenceId();
+1 −9
Original line number Diff line number Diff line
@@ -54,8 +54,6 @@ public final class ComFieldListPacket implements CommandPacket, CommandPacketReb
    @Getter
    private final int sequenceId;
    
    private final int connectionId;
    
    private final String table;
    
    private final String fieldWildcard;
@@ -64,10 +62,9 @@ public final class ComFieldListPacket implements CommandPacket, CommandPacketReb
    
    public ComFieldListPacket(final int sequenceId, final int connectionId, final MySQLPacketPayload payload, final BackendConnection backendConnection) {
        this.sequenceId = sequenceId;
        this.connectionId = connectionId;
        table = payload.readStringNul();
        fieldWildcard = payload.readStringEOF();
        backendHandler = BackendHandlerFactory.newTextProtocolInstance(String.format(SQL, table, ShardingConstant.LOGIC_SCHEMA_NAME), backendConnection, DatabaseType.MySQL, this);
        backendHandler = BackendHandlerFactory.newTextProtocolInstance(connectionId, String.format(SQL, table, ShardingConstant.LOGIC_SCHEMA_NAME), backendConnection, DatabaseType.MySQL, this);
    }
    
    @Override
@@ -97,11 +94,6 @@ public final class ComFieldListPacket implements CommandPacket, CommandPacketReb
        return result;
    }
    
    @Override
    public int connectionId() {
        return connectionId;
    }
    
    @Override
    public int sequenceId() {
        return getSequenceId();
Loading