Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/BackendHandler.java +21 −0 Original line number Diff line number Diff line Loading @@ -17,8 +17,11 @@ package io.shardingsphere.proxy.backend.common; import io.shardingsphere.proxy.transport.common.packet.DatabaseProtocolPacket; import io.shardingsphere.proxy.transport.mysql.packet.command.CommandResponsePackets; import java.sql.SQLException; /** * Backend handler. * Loading @@ -32,4 +35,22 @@ public interface BackendHandler { * @return result packets to be sent */ CommandResponsePackets execute(); /** * Has more Result value. * * @return has more result value * @throws SQLException sql exception */ boolean hasMoreResultValue() throws SQLException; /** * Get result value. * * @return database protocol packet */ DatabaseProtocolPacket getResultValue(); @Deprecated boolean isHasMoreResultValueFlag(); } sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/SQLPacketsBackendHandler.java +4 −13 Original line number Diff line number Diff line Loading @@ -207,17 +207,12 @@ public final class SQLPacketsBackendHandler implements BackendHandler { sqlStatement, RuleRegistry.getInstance().getShardingMetaData()).merge(); isMerged = true; } catch (final SQLException ex) { return new CommandResponsePackets(new ErrPacket(1, ex.getErrorCode(), ex.getSQLState(), ex.getMessage())); return new CommandResponsePackets(new ErrPacket(1, ex)); } return packets.get(0); } /** * Has more Result value. * * @return has more result value * @throws SQLException sql exception */ @Override public boolean hasMoreResultValue() throws SQLException { if (!isMerged || !hasMoreResultValueFlag) { return false; Loading @@ -228,11 +223,7 @@ public final class SQLPacketsBackendHandler implements BackendHandler { return true; } /** * Get result value. * * @return database protocol packet */ @Override public DatabaseProtocolPacket getResultValue() { if (!hasMoreResultValueFlag) { return new EofPacket(++currentSequenceId, 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue()); Loading @@ -244,7 +235,7 @@ public final class SQLPacketsBackendHandler implements BackendHandler { } return new TextResultSetRowPacket(++currentSequenceId, data); } catch (final SQLException ex) { return new ErrPacket(1, ex.getErrorCode(), ex.getSQLState(), ex.getMessage()); return new ErrPacket(1, ex); } } } sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/ExecuteBackendHandler.java→sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCBackendHandler.java +28 −36 Original line number Diff line number Diff line Loading @@ -15,9 +15,8 @@ * </p> */ package io.shardingsphere.proxy.backend.common; package io.shardingsphere.proxy.backend.common.jdbc; import io.shardingsphere.core.constant.DatabaseType; import io.shardingsphere.core.constant.SQLType; import io.shardingsphere.core.constant.TransactionType; import io.shardingsphere.core.exception.ShardingException; Loading @@ -30,6 +29,10 @@ import io.shardingsphere.core.routing.SQLExecutionUnit; import io.shardingsphere.core.routing.SQLRouteResult; import io.shardingsphere.core.routing.SQLUnit; import io.shardingsphere.core.routing.router.masterslave.MasterSlaveRouter; import io.shardingsphere.proxy.backend.common.BackendHandler; import io.shardingsphere.proxy.backend.common.ProxyConnectionHolder; import io.shardingsphere.proxy.backend.common.ProxyMode; import io.shardingsphere.proxy.backend.common.ResultList; import io.shardingsphere.proxy.backend.resource.BaseJDBCResource; import io.shardingsphere.proxy.config.RuleRegistry; import io.shardingsphere.proxy.metadata.ProxyShardingRefreshHandler; Loading Loading @@ -62,13 +65,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; /** * Abstract ExecuteBackendHandler for SQL or PrepareStatement. * Backend handler via JDBC to connect databases. * * @author zhaojun */ @Getter @Slf4j public abstract class ExecuteBackendHandler implements BackendHandler { public abstract class JDBCBackendHandler implements BackendHandler { private final String sql; Loading @@ -83,30 +86,28 @@ public abstract class ExecuteBackendHandler implements BackendHandler { private boolean hasMoreResultValueFlag; private final DatabaseType databaseType; private final BaseJDBCResource jdbcResource; private final boolean showSQL; private final RuleRegistry ruleRegistry; @Setter private BaseJDBCResource jdbcResource; private final List<ResultList> resultLists = new CopyOnWriteArrayList<>(); private final List<ResultList> resultLists; public ExecuteBackendHandler(final String sql, final DatabaseType databaseType, final boolean showSQL) { public JDBCBackendHandler(final String sql, final BaseJDBCResource jdbcResource) { this.sql = sql; isMerged = false; hasMoreResultValueFlag = true; this.databaseType = databaseType; this.showSQL = showSQL; this.jdbcResource = jdbcResource; ruleRegistry = RuleRegistry.getInstance(); resultLists = new CopyOnWriteArrayList<>(); } @Override public CommandResponsePackets execute() { try { return doExecuteInternal(RuleRegistry.getInstance().isMasterSlaveOnly() ? doMasterSlaveRoute() : doSqlShardingRoute()); return doExecuteInternal(ruleRegistry.isMasterSlaveOnly() ? doMasterSlaveRoute() : doSqlShardingRoute()); } catch (final Exception ex) { log.error("ExecuteBackendHandler", ex); return new CommandResponsePackets(new ErrPacket(1, 0, "", "" + ex.getMessage())); return new CommandResponsePackets(new ErrPacket(1, new SQLException(ex))); } } Loading @@ -117,7 +118,7 @@ public abstract class ExecuteBackendHandler implements BackendHandler { if (isXaDDL(routeResult)) { throw new SQLException("DDL command can't not execute in xa transaction mode."); } ExecutorService executorService = RuleRegistry.getInstance().getExecutorService(); ExecutorService executorService = ruleRegistry.getExecutorService(); List<Future<CommandResponsePackets>> futureList = new ArrayList<>(1024); for (SQLExecutionUnit each : routeResult.getExecutionUnits()) { Statement statement = prepareResource(each.getDataSource(), each.getSqlUnit().getSql(), routeResult.getSqlStatement()); Loading @@ -125,21 +126,21 @@ public abstract class ExecuteBackendHandler implements BackendHandler { } List<CommandResponsePackets> packets = buildCommandResponsePackets(futureList); CommandResponsePackets result = merge(routeResult.getSqlStatement(), packets); if (!RuleRegistry.getInstance().isMasterSlaveOnly()) { if (!ruleRegistry.isMasterSlaveOnly()) { ProxyShardingRefreshHandler.build(routeResult).execute(); } return result; } private boolean isXaDDL(final SQLRouteResult routeResult) throws SystemException { return TransactionType.XA.equals(RuleRegistry.getInstance().getTransactionType()) return TransactionType.XA.equals(ruleRegistry.getTransactionType()) && SQLType.DDL.equals(routeResult.getSqlStatement().getType()) && Status.STATUS_NO_TRANSACTION != AtomikosUserTransaction.getInstance().getStatus(); } private SQLRouteResult doMasterSlaveRoute() { SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); SQLRouteResult result = new SQLRouteResult(sqlStatement, null); String dataSourceName = new MasterSlaveRouter(RuleRegistry.getInstance().getMasterSlaveRule()).route(sqlStatement.getType()).iterator().next(); String dataSourceName = new MasterSlaveRouter(ruleRegistry.getMasterSlaveRule()).route(sqlStatement.getType()).iterator().next(); SQLUnit sqlUnit = new SQLUnit(sql, Collections.<List<Object>>emptyList()); result.getExecutionUnits().add(new SQLExecutionUnit(dataSourceName, sqlUnit)); return result; Loading @@ -147,9 +148,9 @@ public abstract class ExecuteBackendHandler implements BackendHandler { protected abstract SQLRouteResult doSqlShardingRoute(); protected abstract Statement prepareResource(String dataSourceName, String unitSql, SQLStatement sqlStatement) throws SQLException; protected abstract Statement prepareResource(String dataSourceName, String unitSQL, SQLStatement sqlStatement) throws SQLException; protected abstract Callable<CommandResponsePackets> newSubmitTask(Statement statement, SQLStatement sqlStatement, String unitSql); protected abstract Callable<CommandResponsePackets> newSubmitTask(Statement statement, SQLStatement sqlStatement, String unitSQL); private List<CommandResponsePackets> buildCommandResponsePackets(final List<Future<CommandResponsePackets>> futureList) { List<CommandResponsePackets> result = new ArrayList<>(); Loading Loading @@ -201,10 +202,10 @@ public abstract class ExecuteBackendHandler implements BackendHandler { queryResults.add(newQueryResult(packets.get(i), i)); } try { mergedResult = MergeEngineFactory.newInstance(RuleRegistry.getInstance().getShardingRule(), queryResults, sqlStatement, RuleRegistry.getInstance().getShardingMetaData()).merge(); mergedResult = MergeEngineFactory.newInstance(ruleRegistry.getShardingRule(), queryResults, sqlStatement, ruleRegistry.getShardingMetaData()).merge(); isMerged = true; } catch (final SQLException ex) { return new CommandResponsePackets(new ErrPacket(1, ex.getErrorCode(), ex.getSQLState(), ex.getMessage())); return new CommandResponsePackets(new ErrPacket(1, ex)); } return buildPackets(packets); } Loading @@ -226,12 +227,7 @@ public abstract class ExecuteBackendHandler implements BackendHandler { return result; } /** * Has more Result value. * * @return has more result value * @throws SQLException sql exception */ @Override public boolean hasMoreResultValue() throws SQLException { if (!isMerged || !hasMoreResultValueFlag) { jdbcResource.clear(); Loading @@ -243,11 +239,7 @@ public abstract class ExecuteBackendHandler implements BackendHandler { return true; } /** * Get result value. * * @return database protocol packet */ @Override public DatabaseProtocolPacket getResultValue() { if (!hasMoreResultValueFlag) { return new EofPacket(++currentSequenceId, 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue()); Loading @@ -259,7 +251,7 @@ public abstract class ExecuteBackendHandler implements BackendHandler { } return newDatabaseProtocolPacket(++currentSequenceId, data); } catch (final SQLException ex) { return new ErrPacket(1, ex.getErrorCode(), ex.getSQLState(), ex.getMessage()); return new ErrPacket(1, ex); } } Loading @@ -267,7 +259,7 @@ public abstract class ExecuteBackendHandler implements BackendHandler { protected Connection getConnection(final DataSource dataSource) throws SQLException { Connection result; if (ProxyMode.CONNECTION_STRICTLY == RuleRegistry.getInstance().getProxyMode()) { if (ProxyMode.CONNECTION_STRICTLY == ruleRegistry.getProxyMode()) { result = ProxyConnectionHolder.getConnection(dataSource); if (null == result) { result = dataSource.getConnection(); Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/ExecuteWorker.java→sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCExecuteWorker.java +13 −12 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ * </p> */ package io.shardingsphere.proxy.backend.common; package io.shardingsphere.proxy.backend.common.jdbc; import io.shardingsphere.core.parsing.parser.sql.SQLStatement; import io.shardingsphere.core.routing.router.masterslave.MasterVisitedManager; Loading @@ -42,16 +42,16 @@ import java.util.List; import java.util.concurrent.Callable; /** * Abstract ExecuteWorker class, include SQL and PreparedStatement implement. * Execute worker via JDBC to connect databases. * * @author zhaojun */ @AllArgsConstructor @Getter @Slf4j public abstract class ExecuteWorker implements Callable<CommandResponsePackets> { public abstract class JDBCExecuteWorker implements Callable<CommandResponsePackets> { private final ExecuteBackendHandler executeBackendHandler; private final JDBCBackendHandler executeBackendHandler; private final SQLStatement sqlStatement; Loading @@ -61,7 +61,7 @@ public abstract class ExecuteWorker implements Callable<CommandResponsePackets> return execute(); } catch (SQLException ex) { log.error("ExecuteWorker", ex); return new CommandResponsePackets(new ErrPacket(1, ex.getErrorCode(), ex.getSQLState(), ex.getMessage())); return new CommandResponsePackets(new ErrPacket(1, ex)); } finally { MasterVisitedManager.clear(); } Loading @@ -79,12 +79,13 @@ public abstract class ExecuteWorker implements Callable<CommandResponsePackets> } private CommandResponsePackets executeQuery() throws SQLException { if (ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode()) { switch (RuleRegistry.getInstance().getProxyMode()) { case MEMORY_STRICTLY: return executeQueryWithStreamResultSet(); } else if (ProxyMode.CONNECTION_STRICTLY == RuleRegistry.getInstance().getProxyMode()) { case CONNECTION_STRICTLY: return executeQueryWithNonStreamResultSet(); } else { return new CommandResponsePackets(new ErrPacket(1, 0, "", "Invalid proxy.mode")); default: throw new UnsupportedOperationException(RuleRegistry.getInstance().getProxyMode().name()); } } Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/StatementExecuteBackendHandler.java→sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/statement/JDBCStatementBackendHandler.java +31 −23 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ * </p> */ package io.shardingsphere.proxy.backend.common; package io.shardingsphere.proxy.backend.common.jdbc.statement; import io.shardingsphere.core.constant.DatabaseType; import io.shardingsphere.core.merger.QueryResult; Loading @@ -23,6 +23,8 @@ import io.shardingsphere.core.parsing.parser.sql.SQLStatement; import io.shardingsphere.core.parsing.parser.sql.dml.insert.InsertStatement; import io.shardingsphere.core.routing.PreparedStatementRoutingEngine; import io.shardingsphere.core.routing.SQLRouteResult; import io.shardingsphere.proxy.backend.common.ProxyMode; import io.shardingsphere.proxy.backend.common.jdbc.JDBCBackendHandler; import io.shardingsphere.proxy.backend.mysql.MySQLPacketStatementExecuteQueryResult; import io.shardingsphere.proxy.backend.resource.ProxyJDBCResourceFactory; import io.shardingsphere.proxy.backend.resource.ProxyPrepareJDBCResource; Loading @@ -46,31 +48,37 @@ import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; /** * Statement execute backend handler. * Statement protocol backend handler via JDBC to connect databases. * * @author zhangyonglun * @author zhaojun */ public final class StatementExecuteBackendHandler extends ExecuteBackendHandler implements BackendHandler { public final class JDBCStatementBackendHandler extends JDBCBackendHandler { private final List<PreparedStatementParameter> preparedStatementParameters; private final DatabaseType databaseType; private final boolean showSQL; @Getter private final List<ColumnType> columnTypes; public StatementExecuteBackendHandler(final List<PreparedStatementParameter> preparedStatementParameters, final int statementId, final DatabaseType databaseType, final boolean showSQL) { super(PreparedStatementRegistry.getInstance().getSQL(statementId), databaseType, showSQL); super.setJdbcResource(ProxyJDBCResourceFactory.newPrepareResource()); private final RuleRegistry ruleRegistry; public JDBCStatementBackendHandler(final List<PreparedStatementParameter> preparedStatementParameters, final int statementId, final DatabaseType databaseType, final boolean showSQL) { super(PreparedStatementRegistry.getInstance().getSQL(statementId), ProxyJDBCResourceFactory.newPrepareResource()); this.preparedStatementParameters = preparedStatementParameters; this.databaseType = databaseType; this.showSQL = showSQL; columnTypes = new CopyOnWriteArrayList<>(); ruleRegistry = RuleRegistry.getInstance(); } @Override protected SQLRouteResult doSqlShardingRoute() { PreparedStatementRoutingEngine routingEngine = new PreparedStatementRoutingEngine(getSql(), RuleRegistry.getInstance().getShardingRule(), RuleRegistry.getInstance().getShardingMetaData(), getDatabaseType(), isShowSQL(), RuleRegistry.getInstance().getShardingDataSourceMetaData()); PreparedStatementRoutingEngine routingEngine = new PreparedStatementRoutingEngine( getSql(), ruleRegistry.getShardingRule(), ruleRegistry.getShardingMetaData(), databaseType, showSQL, ruleRegistry.getShardingDataSourceMetaData()); return routingEngine.route(getComStmtExecuteParameters()); } Loading @@ -83,33 +91,33 @@ public final class StatementExecuteBackendHandler extends ExecuteBackendHandler } @Override protected Callable<CommandResponsePackets> newSubmitTask(final Statement statement, final SQLStatement sqlStatement, final String unitSql) { return new StatementExecuteWorker(this, sqlStatement, (PreparedStatement) statement); protected Callable<CommandResponsePackets> newSubmitTask(final Statement statement, final SQLStatement sqlStatement, final String unitSQL) { return new JDBCStatementExecuteWorker(this, sqlStatement, (PreparedStatement) statement); } @Override protected PreparedStatement prepareResource(final String dataSourceName, final String unitSql, final SQLStatement sqlStatement) throws SQLException { DataSource dataSource = RuleRegistry.getInstance().getDataSourceMap().get(dataSourceName); protected PreparedStatement prepareResource(final String dataSourceName, final String unitSQL, final SQLStatement sqlStatement) throws SQLException { DataSource dataSource = ruleRegistry.getDataSourceMap().get(dataSourceName); Connection connection = getConnection(dataSource); PreparedStatement statement = sqlStatement instanceof InsertStatement ? connection.prepareStatement(unitSql, Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(unitSql); PreparedStatement result = sqlStatement instanceof InsertStatement ? connection.prepareStatement(unitSQL, Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(unitSQL); for (int i = 0; i < preparedStatementParameters.size(); i++) { statement.setObject(i + 1, preparedStatementParameters.get(i).getValue()); result.setObject(i + 1, preparedStatementParameters.get(i).getValue()); } ProxyPrepareJDBCResource prepareProxyJDBCResource = (ProxyPrepareJDBCResource) getJdbcResource(); prepareProxyJDBCResource.addConnection(connection); prepareProxyJDBCResource.addPrepareStatement(statement); return statement; prepareProxyJDBCResource.addPrepareStatement(result); return result; } @Override protected QueryResult newQueryResult(final CommandResponsePackets packet, final int index) { MySQLPacketStatementExecuteQueryResult mySQLPacketStatementExecuteQueryResult = new MySQLPacketStatementExecuteQueryResult(packet, columnTypes); if (ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode()) { mySQLPacketStatementExecuteQueryResult.setResultSet(getJdbcResource().getResultSets().get(index)); MySQLPacketStatementExecuteQueryResult result = new MySQLPacketStatementExecuteQueryResult(packet, columnTypes); if (ProxyMode.MEMORY_STRICTLY == ruleRegistry.getProxyMode()) { result.setResultSet(getJdbcResource().getResultSets().get(index)); } else { mySQLPacketStatementExecuteQueryResult.setResultList(getResultLists().get(index)); result.setResultList(getResultLists().get(index)); } return mySQLPacketStatementExecuteQueryResult; return result; } @Override Loading Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/BackendHandler.java +21 −0 Original line number Diff line number Diff line Loading @@ -17,8 +17,11 @@ package io.shardingsphere.proxy.backend.common; import io.shardingsphere.proxy.transport.common.packet.DatabaseProtocolPacket; import io.shardingsphere.proxy.transport.mysql.packet.command.CommandResponsePackets; import java.sql.SQLException; /** * Backend handler. * Loading @@ -32,4 +35,22 @@ public interface BackendHandler { * @return result packets to be sent */ CommandResponsePackets execute(); /** * Has more Result value. * * @return has more result value * @throws SQLException sql exception */ boolean hasMoreResultValue() throws SQLException; /** * Get result value. * * @return database protocol packet */ DatabaseProtocolPacket getResultValue(); @Deprecated boolean isHasMoreResultValueFlag(); }
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/SQLPacketsBackendHandler.java +4 −13 Original line number Diff line number Diff line Loading @@ -207,17 +207,12 @@ public final class SQLPacketsBackendHandler implements BackendHandler { sqlStatement, RuleRegistry.getInstance().getShardingMetaData()).merge(); isMerged = true; } catch (final SQLException ex) { return new CommandResponsePackets(new ErrPacket(1, ex.getErrorCode(), ex.getSQLState(), ex.getMessage())); return new CommandResponsePackets(new ErrPacket(1, ex)); } return packets.get(0); } /** * Has more Result value. * * @return has more result value * @throws SQLException sql exception */ @Override public boolean hasMoreResultValue() throws SQLException { if (!isMerged || !hasMoreResultValueFlag) { return false; Loading @@ -228,11 +223,7 @@ public final class SQLPacketsBackendHandler implements BackendHandler { return true; } /** * Get result value. * * @return database protocol packet */ @Override public DatabaseProtocolPacket getResultValue() { if (!hasMoreResultValueFlag) { return new EofPacket(++currentSequenceId, 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue()); Loading @@ -244,7 +235,7 @@ public final class SQLPacketsBackendHandler implements BackendHandler { } return new TextResultSetRowPacket(++currentSequenceId, data); } catch (final SQLException ex) { return new ErrPacket(1, ex.getErrorCode(), ex.getSQLState(), ex.getMessage()); return new ErrPacket(1, ex); } } }
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/ExecuteBackendHandler.java→sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCBackendHandler.java +28 −36 Original line number Diff line number Diff line Loading @@ -15,9 +15,8 @@ * </p> */ package io.shardingsphere.proxy.backend.common; package io.shardingsphere.proxy.backend.common.jdbc; import io.shardingsphere.core.constant.DatabaseType; import io.shardingsphere.core.constant.SQLType; import io.shardingsphere.core.constant.TransactionType; import io.shardingsphere.core.exception.ShardingException; Loading @@ -30,6 +29,10 @@ import io.shardingsphere.core.routing.SQLExecutionUnit; import io.shardingsphere.core.routing.SQLRouteResult; import io.shardingsphere.core.routing.SQLUnit; import io.shardingsphere.core.routing.router.masterslave.MasterSlaveRouter; import io.shardingsphere.proxy.backend.common.BackendHandler; import io.shardingsphere.proxy.backend.common.ProxyConnectionHolder; import io.shardingsphere.proxy.backend.common.ProxyMode; import io.shardingsphere.proxy.backend.common.ResultList; import io.shardingsphere.proxy.backend.resource.BaseJDBCResource; import io.shardingsphere.proxy.config.RuleRegistry; import io.shardingsphere.proxy.metadata.ProxyShardingRefreshHandler; Loading Loading @@ -62,13 +65,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; /** * Abstract ExecuteBackendHandler for SQL or PrepareStatement. * Backend handler via JDBC to connect databases. * * @author zhaojun */ @Getter @Slf4j public abstract class ExecuteBackendHandler implements BackendHandler { public abstract class JDBCBackendHandler implements BackendHandler { private final String sql; Loading @@ -83,30 +86,28 @@ public abstract class ExecuteBackendHandler implements BackendHandler { private boolean hasMoreResultValueFlag; private final DatabaseType databaseType; private final BaseJDBCResource jdbcResource; private final boolean showSQL; private final RuleRegistry ruleRegistry; @Setter private BaseJDBCResource jdbcResource; private final List<ResultList> resultLists = new CopyOnWriteArrayList<>(); private final List<ResultList> resultLists; public ExecuteBackendHandler(final String sql, final DatabaseType databaseType, final boolean showSQL) { public JDBCBackendHandler(final String sql, final BaseJDBCResource jdbcResource) { this.sql = sql; isMerged = false; hasMoreResultValueFlag = true; this.databaseType = databaseType; this.showSQL = showSQL; this.jdbcResource = jdbcResource; ruleRegistry = RuleRegistry.getInstance(); resultLists = new CopyOnWriteArrayList<>(); } @Override public CommandResponsePackets execute() { try { return doExecuteInternal(RuleRegistry.getInstance().isMasterSlaveOnly() ? doMasterSlaveRoute() : doSqlShardingRoute()); return doExecuteInternal(ruleRegistry.isMasterSlaveOnly() ? doMasterSlaveRoute() : doSqlShardingRoute()); } catch (final Exception ex) { log.error("ExecuteBackendHandler", ex); return new CommandResponsePackets(new ErrPacket(1, 0, "", "" + ex.getMessage())); return new CommandResponsePackets(new ErrPacket(1, new SQLException(ex))); } } Loading @@ -117,7 +118,7 @@ public abstract class ExecuteBackendHandler implements BackendHandler { if (isXaDDL(routeResult)) { throw new SQLException("DDL command can't not execute in xa transaction mode."); } ExecutorService executorService = RuleRegistry.getInstance().getExecutorService(); ExecutorService executorService = ruleRegistry.getExecutorService(); List<Future<CommandResponsePackets>> futureList = new ArrayList<>(1024); for (SQLExecutionUnit each : routeResult.getExecutionUnits()) { Statement statement = prepareResource(each.getDataSource(), each.getSqlUnit().getSql(), routeResult.getSqlStatement()); Loading @@ -125,21 +126,21 @@ public abstract class ExecuteBackendHandler implements BackendHandler { } List<CommandResponsePackets> packets = buildCommandResponsePackets(futureList); CommandResponsePackets result = merge(routeResult.getSqlStatement(), packets); if (!RuleRegistry.getInstance().isMasterSlaveOnly()) { if (!ruleRegistry.isMasterSlaveOnly()) { ProxyShardingRefreshHandler.build(routeResult).execute(); } return result; } private boolean isXaDDL(final SQLRouteResult routeResult) throws SystemException { return TransactionType.XA.equals(RuleRegistry.getInstance().getTransactionType()) return TransactionType.XA.equals(ruleRegistry.getTransactionType()) && SQLType.DDL.equals(routeResult.getSqlStatement().getType()) && Status.STATUS_NO_TRANSACTION != AtomikosUserTransaction.getInstance().getStatus(); } private SQLRouteResult doMasterSlaveRoute() { SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); SQLRouteResult result = new SQLRouteResult(sqlStatement, null); String dataSourceName = new MasterSlaveRouter(RuleRegistry.getInstance().getMasterSlaveRule()).route(sqlStatement.getType()).iterator().next(); String dataSourceName = new MasterSlaveRouter(ruleRegistry.getMasterSlaveRule()).route(sqlStatement.getType()).iterator().next(); SQLUnit sqlUnit = new SQLUnit(sql, Collections.<List<Object>>emptyList()); result.getExecutionUnits().add(new SQLExecutionUnit(dataSourceName, sqlUnit)); return result; Loading @@ -147,9 +148,9 @@ public abstract class ExecuteBackendHandler implements BackendHandler { protected abstract SQLRouteResult doSqlShardingRoute(); protected abstract Statement prepareResource(String dataSourceName, String unitSql, SQLStatement sqlStatement) throws SQLException; protected abstract Statement prepareResource(String dataSourceName, String unitSQL, SQLStatement sqlStatement) throws SQLException; protected abstract Callable<CommandResponsePackets> newSubmitTask(Statement statement, SQLStatement sqlStatement, String unitSql); protected abstract Callable<CommandResponsePackets> newSubmitTask(Statement statement, SQLStatement sqlStatement, String unitSQL); private List<CommandResponsePackets> buildCommandResponsePackets(final List<Future<CommandResponsePackets>> futureList) { List<CommandResponsePackets> result = new ArrayList<>(); Loading Loading @@ -201,10 +202,10 @@ public abstract class ExecuteBackendHandler implements BackendHandler { queryResults.add(newQueryResult(packets.get(i), i)); } try { mergedResult = MergeEngineFactory.newInstance(RuleRegistry.getInstance().getShardingRule(), queryResults, sqlStatement, RuleRegistry.getInstance().getShardingMetaData()).merge(); mergedResult = MergeEngineFactory.newInstance(ruleRegistry.getShardingRule(), queryResults, sqlStatement, ruleRegistry.getShardingMetaData()).merge(); isMerged = true; } catch (final SQLException ex) { return new CommandResponsePackets(new ErrPacket(1, ex.getErrorCode(), ex.getSQLState(), ex.getMessage())); return new CommandResponsePackets(new ErrPacket(1, ex)); } return buildPackets(packets); } Loading @@ -226,12 +227,7 @@ public abstract class ExecuteBackendHandler implements BackendHandler { return result; } /** * Has more Result value. * * @return has more result value * @throws SQLException sql exception */ @Override public boolean hasMoreResultValue() throws SQLException { if (!isMerged || !hasMoreResultValueFlag) { jdbcResource.clear(); Loading @@ -243,11 +239,7 @@ public abstract class ExecuteBackendHandler implements BackendHandler { return true; } /** * Get result value. * * @return database protocol packet */ @Override public DatabaseProtocolPacket getResultValue() { if (!hasMoreResultValueFlag) { return new EofPacket(++currentSequenceId, 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue()); Loading @@ -259,7 +251,7 @@ public abstract class ExecuteBackendHandler implements BackendHandler { } return newDatabaseProtocolPacket(++currentSequenceId, data); } catch (final SQLException ex) { return new ErrPacket(1, ex.getErrorCode(), ex.getSQLState(), ex.getMessage()); return new ErrPacket(1, ex); } } Loading @@ -267,7 +259,7 @@ public abstract class ExecuteBackendHandler implements BackendHandler { protected Connection getConnection(final DataSource dataSource) throws SQLException { Connection result; if (ProxyMode.CONNECTION_STRICTLY == RuleRegistry.getInstance().getProxyMode()) { if (ProxyMode.CONNECTION_STRICTLY == ruleRegistry.getProxyMode()) { result = ProxyConnectionHolder.getConnection(dataSource); if (null == result) { result = dataSource.getConnection(); Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/ExecuteWorker.java→sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCExecuteWorker.java +13 −12 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ * </p> */ package io.shardingsphere.proxy.backend.common; package io.shardingsphere.proxy.backend.common.jdbc; import io.shardingsphere.core.parsing.parser.sql.SQLStatement; import io.shardingsphere.core.routing.router.masterslave.MasterVisitedManager; Loading @@ -42,16 +42,16 @@ import java.util.List; import java.util.concurrent.Callable; /** * Abstract ExecuteWorker class, include SQL and PreparedStatement implement. * Execute worker via JDBC to connect databases. * * @author zhaojun */ @AllArgsConstructor @Getter @Slf4j public abstract class ExecuteWorker implements Callable<CommandResponsePackets> { public abstract class JDBCExecuteWorker implements Callable<CommandResponsePackets> { private final ExecuteBackendHandler executeBackendHandler; private final JDBCBackendHandler executeBackendHandler; private final SQLStatement sqlStatement; Loading @@ -61,7 +61,7 @@ public abstract class ExecuteWorker implements Callable<CommandResponsePackets> return execute(); } catch (SQLException ex) { log.error("ExecuteWorker", ex); return new CommandResponsePackets(new ErrPacket(1, ex.getErrorCode(), ex.getSQLState(), ex.getMessage())); return new CommandResponsePackets(new ErrPacket(1, ex)); } finally { MasterVisitedManager.clear(); } Loading @@ -79,12 +79,13 @@ public abstract class ExecuteWorker implements Callable<CommandResponsePackets> } private CommandResponsePackets executeQuery() throws SQLException { if (ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode()) { switch (RuleRegistry.getInstance().getProxyMode()) { case MEMORY_STRICTLY: return executeQueryWithStreamResultSet(); } else if (ProxyMode.CONNECTION_STRICTLY == RuleRegistry.getInstance().getProxyMode()) { case CONNECTION_STRICTLY: return executeQueryWithNonStreamResultSet(); } else { return new CommandResponsePackets(new ErrPacket(1, 0, "", "Invalid proxy.mode")); default: throw new UnsupportedOperationException(RuleRegistry.getInstance().getProxyMode().name()); } } Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/StatementExecuteBackendHandler.java→sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/statement/JDBCStatementBackendHandler.java +31 −23 Original line number Diff line number Diff line Loading @@ -15,7 +15,7 @@ * </p> */ package io.shardingsphere.proxy.backend.common; package io.shardingsphere.proxy.backend.common.jdbc.statement; import io.shardingsphere.core.constant.DatabaseType; import io.shardingsphere.core.merger.QueryResult; Loading @@ -23,6 +23,8 @@ import io.shardingsphere.core.parsing.parser.sql.SQLStatement; import io.shardingsphere.core.parsing.parser.sql.dml.insert.InsertStatement; import io.shardingsphere.core.routing.PreparedStatementRoutingEngine; import io.shardingsphere.core.routing.SQLRouteResult; import io.shardingsphere.proxy.backend.common.ProxyMode; import io.shardingsphere.proxy.backend.common.jdbc.JDBCBackendHandler; import io.shardingsphere.proxy.backend.mysql.MySQLPacketStatementExecuteQueryResult; import io.shardingsphere.proxy.backend.resource.ProxyJDBCResourceFactory; import io.shardingsphere.proxy.backend.resource.ProxyPrepareJDBCResource; Loading @@ -46,31 +48,37 @@ import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; /** * Statement execute backend handler. * Statement protocol backend handler via JDBC to connect databases. * * @author zhangyonglun * @author zhaojun */ public final class StatementExecuteBackendHandler extends ExecuteBackendHandler implements BackendHandler { public final class JDBCStatementBackendHandler extends JDBCBackendHandler { private final List<PreparedStatementParameter> preparedStatementParameters; private final DatabaseType databaseType; private final boolean showSQL; @Getter private final List<ColumnType> columnTypes; public StatementExecuteBackendHandler(final List<PreparedStatementParameter> preparedStatementParameters, final int statementId, final DatabaseType databaseType, final boolean showSQL) { super(PreparedStatementRegistry.getInstance().getSQL(statementId), databaseType, showSQL); super.setJdbcResource(ProxyJDBCResourceFactory.newPrepareResource()); private final RuleRegistry ruleRegistry; public JDBCStatementBackendHandler(final List<PreparedStatementParameter> preparedStatementParameters, final int statementId, final DatabaseType databaseType, final boolean showSQL) { super(PreparedStatementRegistry.getInstance().getSQL(statementId), ProxyJDBCResourceFactory.newPrepareResource()); this.preparedStatementParameters = preparedStatementParameters; this.databaseType = databaseType; this.showSQL = showSQL; columnTypes = new CopyOnWriteArrayList<>(); ruleRegistry = RuleRegistry.getInstance(); } @Override protected SQLRouteResult doSqlShardingRoute() { PreparedStatementRoutingEngine routingEngine = new PreparedStatementRoutingEngine(getSql(), RuleRegistry.getInstance().getShardingRule(), RuleRegistry.getInstance().getShardingMetaData(), getDatabaseType(), isShowSQL(), RuleRegistry.getInstance().getShardingDataSourceMetaData()); PreparedStatementRoutingEngine routingEngine = new PreparedStatementRoutingEngine( getSql(), ruleRegistry.getShardingRule(), ruleRegistry.getShardingMetaData(), databaseType, showSQL, ruleRegistry.getShardingDataSourceMetaData()); return routingEngine.route(getComStmtExecuteParameters()); } Loading @@ -83,33 +91,33 @@ public final class StatementExecuteBackendHandler extends ExecuteBackendHandler } @Override protected Callable<CommandResponsePackets> newSubmitTask(final Statement statement, final SQLStatement sqlStatement, final String unitSql) { return new StatementExecuteWorker(this, sqlStatement, (PreparedStatement) statement); protected Callable<CommandResponsePackets> newSubmitTask(final Statement statement, final SQLStatement sqlStatement, final String unitSQL) { return new JDBCStatementExecuteWorker(this, sqlStatement, (PreparedStatement) statement); } @Override protected PreparedStatement prepareResource(final String dataSourceName, final String unitSql, final SQLStatement sqlStatement) throws SQLException { DataSource dataSource = RuleRegistry.getInstance().getDataSourceMap().get(dataSourceName); protected PreparedStatement prepareResource(final String dataSourceName, final String unitSQL, final SQLStatement sqlStatement) throws SQLException { DataSource dataSource = ruleRegistry.getDataSourceMap().get(dataSourceName); Connection connection = getConnection(dataSource); PreparedStatement statement = sqlStatement instanceof InsertStatement ? connection.prepareStatement(unitSql, Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(unitSql); PreparedStatement result = sqlStatement instanceof InsertStatement ? connection.prepareStatement(unitSQL, Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(unitSQL); for (int i = 0; i < preparedStatementParameters.size(); i++) { statement.setObject(i + 1, preparedStatementParameters.get(i).getValue()); result.setObject(i + 1, preparedStatementParameters.get(i).getValue()); } ProxyPrepareJDBCResource prepareProxyJDBCResource = (ProxyPrepareJDBCResource) getJdbcResource(); prepareProxyJDBCResource.addConnection(connection); prepareProxyJDBCResource.addPrepareStatement(statement); return statement; prepareProxyJDBCResource.addPrepareStatement(result); return result; } @Override protected QueryResult newQueryResult(final CommandResponsePackets packet, final int index) { MySQLPacketStatementExecuteQueryResult mySQLPacketStatementExecuteQueryResult = new MySQLPacketStatementExecuteQueryResult(packet, columnTypes); if (ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode()) { mySQLPacketStatementExecuteQueryResult.setResultSet(getJdbcResource().getResultSets().get(index)); MySQLPacketStatementExecuteQueryResult result = new MySQLPacketStatementExecuteQueryResult(packet, columnTypes); if (ProxyMode.MEMORY_STRICTLY == ruleRegistry.getProxyMode()) { result.setResultSet(getJdbcResource().getResultSets().get(index)); } else { mySQLPacketStatementExecuteQueryResult.setResultList(getResultLists().get(index)); result.setResultList(getResultLists().get(index)); } return mySQLPacketStatementExecuteQueryResult; return result; } @Override Loading