Loading sharding-core/src/main/java/io/shardingsphere/core/executor/sql/SQLExecuteTemplate.java +17 −2 Original line number Diff line number Diff line Loading @@ -57,11 +57,26 @@ public final class SQLExecuteTemplate { * @throws SQLException SQL exception */ public <T> List<T> execute(final Collection<? extends StatementExecuteUnit> executeUnits, final SQLExecuteCallback<T> executeCallback) throws SQLException { return execute(executeUnits, null, executeCallback); } /** * Execute. * * @param executeUnits execute units * @param firstExecuteCallback first execute callback * @param executeCallback execute callback * @param <T> class type of return value * @return execute result * @throws SQLException SQL exception */ public <T> List<T> execute( final Collection<? extends StatementExecuteUnit> executeUnits, final SQLExecuteCallback<T> firstExecuteCallback, final SQLExecuteCallback<T> executeCallback) throws SQLException { OverallExecutionEvent event = new OverallExecutionEvent(executeUnits.size() > 1); ShardingEventBusInstance.getInstance().post(event); try { List<T> result = ConnectionMode.MEMORY_STRICTLY == connectionMode ? executeEngine.execute(new LinkedList<>(executeUnits), executeCallback) : executeEngine.groupExecute(getExecuteUnitGroups(executeUnits), executeCallback); List<T> result = ConnectionMode.MEMORY_STRICTLY == connectionMode ? executeEngine.execute(new LinkedList<>(executeUnits), firstExecuteCallback, executeCallback) : executeEngine.groupExecute(getExecuteUnitGroups(executeUnits), firstExecuteCallback, executeCallback); event.setExecuteSuccess(); return result; // CHECKSTYLE:OFF Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/JDBCExecuteEngine.java +0 −4 Original line number Diff line number Diff line Loading @@ -65,7 +65,6 @@ public abstract class JDBCExecuteEngine implements SQLExecuteEngine { protected final ExecuteResponseUnit executeWithMetadata(final Statement statement, final String sql, final boolean isReturnGeneratedKeys) throws SQLException { backendConnection.add(statement); setFetchSize(statement); if (!jdbcExecutorWrapper.executeSQL(statement, sql, isReturnGeneratedKeys)) { return new ExecuteUpdateResponseUnit(new OKPacket(1, statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0)); } Loading @@ -80,7 +79,6 @@ public abstract class JDBCExecuteEngine implements SQLExecuteEngine { protected final ExecuteResponseUnit executeWithoutMetadata(final Statement statement, final String sql, final boolean isReturnGeneratedKeys) throws SQLException { backendConnection.add(statement); setFetchSize(statement); if (!jdbcExecutorWrapper.executeSQL(statement, sql, isReturnGeneratedKeys)) { return new ExecuteUpdateResponseUnit(new OKPacket(1, statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0)); } Loading @@ -89,8 +87,6 @@ public abstract class JDBCExecuteEngine implements SQLExecuteEngine { return new ExecuteQueryResponseUnit(null, createQueryResult(resultSet)); } protected abstract void setFetchSize(Statement statement) throws SQLException; private long getGeneratedKey(final Statement statement) throws SQLException { ResultSet resultSet = statement.getGeneratedKeys(); return resultSet.next() ? resultSet.getLong(1) : 0L; Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/memory/ConnectionStrictlyExecuteEngine.java +8 −9 Original line number Diff line number Diff line Loading @@ -17,8 +17,10 @@ package io.shardingsphere.proxy.backend.jdbc.execute.memory; import io.shardingsphere.core.constant.ConnectionMode; import io.shardingsphere.core.constant.SQLType; import io.shardingsphere.core.executor.sql.SQLExecuteCallback; import io.shardingsphere.core.executor.sql.SQLExecuteTemplate; import io.shardingsphere.core.executor.sql.StatementExecuteUnit; import io.shardingsphere.core.executor.sql.result.MemoryQueryResult; import io.shardingsphere.core.executor.sql.threadlocal.ExecutorDataMap; Loading @@ -41,9 +43,7 @@ import io.shardingsphere.proxy.backend.jdbc.wrapper.JDBCExecutorWrapper; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; Loading @@ -56,21 +56,24 @@ import java.util.Map.Entry; */ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine { private final SQLExecuteTemplate sqlExecuteTemplate; public ConnectionStrictlyExecuteEngine(final BackendConnection backendConnection, final JDBCExecutorWrapper jdbcExecutorWrapper) { super(backendConnection, jdbcExecutorWrapper); sqlExecuteTemplate = new SQLExecuteTemplate(BackendExecutorContext.getInstance().getExecuteEngine(), ConnectionMode.CONNECTION_STRICTLY); } @Override public ExecuteResponse execute(final SQLRouteResult routeResult, final boolean isReturnGeneratedKeys) throws SQLException { Map<String, Collection<SQLUnit>> sqlUnitGroups = routeResult.getSQLUnitGroups(); Map<String, Collection<StatementExecuteUnit>> sqlUnitStatements = new HashMap<>(sqlUnitGroups.size(), 1); Collection<StatementExecuteUnit> executeUnits = new LinkedList<>(); for (Entry<String, Collection<SQLUnit>> entry : sqlUnitGroups.entrySet()) { sqlUnitStatements.put(entry.getKey(), createSQLUnitStatement(entry.getKey(), entry.getValue(), isReturnGeneratedKeys)); executeUnits.addAll(createSQLUnitStatement(entry.getKey(), entry.getValue(), isReturnGeneratedKeys)); } SQLType sqlType = routeResult.getSqlStatement().getType(); boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); Collection<ExecuteResponseUnit> executeResponseUnits = BackendExecutorContext.getInstance().getExecuteEngine().groupExecute(sqlUnitStatements, Collection<ExecuteResponseUnit> executeResponseUnits = sqlExecuteTemplate.execute(executeUnits, new FirstConnectionStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys), new ConnectionStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys)); return getExecuteQueryResponse(executeResponseUnits); Loading Loading @@ -103,10 +106,6 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine { return new ExecuteUpdateResponse(executeResponseUnits); } @Override protected void setFetchSize(final Statement statement) { } @Override protected QueryResult createQueryResult(final ResultSet resultSet) throws SQLException { return new MemoryQueryResult(resultSet); Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/stream/MemoryStrictlyExecuteEngine.java +8 −6 Original line number Diff line number Diff line Loading @@ -17,8 +17,10 @@ package io.shardingsphere.proxy.backend.jdbc.execute.stream; import io.shardingsphere.core.constant.ConnectionMode; import io.shardingsphere.core.constant.SQLType; import io.shardingsphere.core.executor.sql.SQLExecuteCallback; import io.shardingsphere.core.executor.sql.SQLExecuteTemplate; import io.shardingsphere.core.executor.sql.StatementExecuteUnit; import io.shardingsphere.core.executor.sql.result.StreamQueryResult; import io.shardingsphere.core.executor.sql.threadlocal.ExecutorDataMap; Loading Loading @@ -54,8 +56,11 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { private static final Integer FETCH_ONE_ROW_A_TIME = Integer.MIN_VALUE; private final SQLExecuteTemplate sqlExecuteTemplate; public MemoryStrictlyExecuteEngine(final BackendConnection backendConnection, final JDBCExecutorWrapper jdbcExecutorWrapper) { super(backendConnection, jdbcExecutorWrapper); sqlExecuteTemplate = new SQLExecuteTemplate(BackendExecutorContext.getInstance().getExecuteEngine(), ConnectionMode.MEMORY_STRICTLY); } @Override Loading @@ -68,7 +73,7 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { SQLType sqlType = routeResult.getSqlStatement().getType(); boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); Collection<ExecuteResponseUnit> executeResponseUnits = BackendExecutorContext.getInstance().getExecuteEngine().execute(statementExecuteUnits, Collection<ExecuteResponseUnit> executeResponseUnits = sqlExecuteTemplate.execute(statementExecuteUnits, new FirstMemoryStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys), new MemoryStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys)); ExecuteResponseUnit firstExecuteResponseUnit = executeResponseUnits.iterator().next(); Loading @@ -88,11 +93,6 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { return new ExecuteUpdateResponse(executeResponseUnits); } @Override protected void setFetchSize(final Statement statement) throws SQLException { statement.setFetchSize(FETCH_ONE_ROW_A_TIME); } @Override protected QueryResult createQueryResult(final ResultSet resultSet) { return new StreamQueryResult(resultSet); Loading @@ -109,6 +109,7 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { @Override protected ExecuteResponseUnit executeSQL(final StatementExecuteUnit executeUnit) throws SQLException { executeUnit.getStatement().setFetchSize(FETCH_ONE_ROW_A_TIME); return executeWithMetadata(executeUnit.getStatement(), executeUnit.getSqlExecutionUnit().getSqlUnit().getSql(), isReturnGeneratedKeys); } } Loading @@ -124,6 +125,7 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { @Override protected ExecuteResponseUnit executeSQL(final StatementExecuteUnit executeUnit) throws SQLException { executeUnit.getStatement().setFetchSize(FETCH_ONE_ROW_A_TIME); return executeWithoutMetadata(executeUnit.getStatement(), executeUnit.getSqlExecutionUnit().getSqlUnit().getSql(), isReturnGeneratedKeys); } } Loading Loading
sharding-core/src/main/java/io/shardingsphere/core/executor/sql/SQLExecuteTemplate.java +17 −2 Original line number Diff line number Diff line Loading @@ -57,11 +57,26 @@ public final class SQLExecuteTemplate { * @throws SQLException SQL exception */ public <T> List<T> execute(final Collection<? extends StatementExecuteUnit> executeUnits, final SQLExecuteCallback<T> executeCallback) throws SQLException { return execute(executeUnits, null, executeCallback); } /** * Execute. * * @param executeUnits execute units * @param firstExecuteCallback first execute callback * @param executeCallback execute callback * @param <T> class type of return value * @return execute result * @throws SQLException SQL exception */ public <T> List<T> execute( final Collection<? extends StatementExecuteUnit> executeUnits, final SQLExecuteCallback<T> firstExecuteCallback, final SQLExecuteCallback<T> executeCallback) throws SQLException { OverallExecutionEvent event = new OverallExecutionEvent(executeUnits.size() > 1); ShardingEventBusInstance.getInstance().post(event); try { List<T> result = ConnectionMode.MEMORY_STRICTLY == connectionMode ? executeEngine.execute(new LinkedList<>(executeUnits), executeCallback) : executeEngine.groupExecute(getExecuteUnitGroups(executeUnits), executeCallback); List<T> result = ConnectionMode.MEMORY_STRICTLY == connectionMode ? executeEngine.execute(new LinkedList<>(executeUnits), firstExecuteCallback, executeCallback) : executeEngine.groupExecute(getExecuteUnitGroups(executeUnits), firstExecuteCallback, executeCallback); event.setExecuteSuccess(); return result; // CHECKSTYLE:OFF Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/JDBCExecuteEngine.java +0 −4 Original line number Diff line number Diff line Loading @@ -65,7 +65,6 @@ public abstract class JDBCExecuteEngine implements SQLExecuteEngine { protected final ExecuteResponseUnit executeWithMetadata(final Statement statement, final String sql, final boolean isReturnGeneratedKeys) throws SQLException { backendConnection.add(statement); setFetchSize(statement); if (!jdbcExecutorWrapper.executeSQL(statement, sql, isReturnGeneratedKeys)) { return new ExecuteUpdateResponseUnit(new OKPacket(1, statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0)); } Loading @@ -80,7 +79,6 @@ public abstract class JDBCExecuteEngine implements SQLExecuteEngine { protected final ExecuteResponseUnit executeWithoutMetadata(final Statement statement, final String sql, final boolean isReturnGeneratedKeys) throws SQLException { backendConnection.add(statement); setFetchSize(statement); if (!jdbcExecutorWrapper.executeSQL(statement, sql, isReturnGeneratedKeys)) { return new ExecuteUpdateResponseUnit(new OKPacket(1, statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0)); } Loading @@ -89,8 +87,6 @@ public abstract class JDBCExecuteEngine implements SQLExecuteEngine { return new ExecuteQueryResponseUnit(null, createQueryResult(resultSet)); } protected abstract void setFetchSize(Statement statement) throws SQLException; private long getGeneratedKey(final Statement statement) throws SQLException { ResultSet resultSet = statement.getGeneratedKeys(); return resultSet.next() ? resultSet.getLong(1) : 0L; Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/memory/ConnectionStrictlyExecuteEngine.java +8 −9 Original line number Diff line number Diff line Loading @@ -17,8 +17,10 @@ package io.shardingsphere.proxy.backend.jdbc.execute.memory; import io.shardingsphere.core.constant.ConnectionMode; import io.shardingsphere.core.constant.SQLType; import io.shardingsphere.core.executor.sql.SQLExecuteCallback; import io.shardingsphere.core.executor.sql.SQLExecuteTemplate; import io.shardingsphere.core.executor.sql.StatementExecuteUnit; import io.shardingsphere.core.executor.sql.result.MemoryQueryResult; import io.shardingsphere.core.executor.sql.threadlocal.ExecutorDataMap; Loading @@ -41,9 +43,7 @@ import io.shardingsphere.proxy.backend.jdbc.wrapper.JDBCExecutorWrapper; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; Loading @@ -56,21 +56,24 @@ import java.util.Map.Entry; */ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine { private final SQLExecuteTemplate sqlExecuteTemplate; public ConnectionStrictlyExecuteEngine(final BackendConnection backendConnection, final JDBCExecutorWrapper jdbcExecutorWrapper) { super(backendConnection, jdbcExecutorWrapper); sqlExecuteTemplate = new SQLExecuteTemplate(BackendExecutorContext.getInstance().getExecuteEngine(), ConnectionMode.CONNECTION_STRICTLY); } @Override public ExecuteResponse execute(final SQLRouteResult routeResult, final boolean isReturnGeneratedKeys) throws SQLException { Map<String, Collection<SQLUnit>> sqlUnitGroups = routeResult.getSQLUnitGroups(); Map<String, Collection<StatementExecuteUnit>> sqlUnitStatements = new HashMap<>(sqlUnitGroups.size(), 1); Collection<StatementExecuteUnit> executeUnits = new LinkedList<>(); for (Entry<String, Collection<SQLUnit>> entry : sqlUnitGroups.entrySet()) { sqlUnitStatements.put(entry.getKey(), createSQLUnitStatement(entry.getKey(), entry.getValue(), isReturnGeneratedKeys)); executeUnits.addAll(createSQLUnitStatement(entry.getKey(), entry.getValue(), isReturnGeneratedKeys)); } SQLType sqlType = routeResult.getSqlStatement().getType(); boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); Collection<ExecuteResponseUnit> executeResponseUnits = BackendExecutorContext.getInstance().getExecuteEngine().groupExecute(sqlUnitStatements, Collection<ExecuteResponseUnit> executeResponseUnits = sqlExecuteTemplate.execute(executeUnits, new FirstConnectionStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys), new ConnectionStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys)); return getExecuteQueryResponse(executeResponseUnits); Loading Loading @@ -103,10 +106,6 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine { return new ExecuteUpdateResponse(executeResponseUnits); } @Override protected void setFetchSize(final Statement statement) { } @Override protected QueryResult createQueryResult(final ResultSet resultSet) throws SQLException { return new MemoryQueryResult(resultSet); Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/jdbc/execute/stream/MemoryStrictlyExecuteEngine.java +8 −6 Original line number Diff line number Diff line Loading @@ -17,8 +17,10 @@ package io.shardingsphere.proxy.backend.jdbc.execute.stream; import io.shardingsphere.core.constant.ConnectionMode; import io.shardingsphere.core.constant.SQLType; import io.shardingsphere.core.executor.sql.SQLExecuteCallback; import io.shardingsphere.core.executor.sql.SQLExecuteTemplate; import io.shardingsphere.core.executor.sql.StatementExecuteUnit; import io.shardingsphere.core.executor.sql.result.StreamQueryResult; import io.shardingsphere.core.executor.sql.threadlocal.ExecutorDataMap; Loading Loading @@ -54,8 +56,11 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { private static final Integer FETCH_ONE_ROW_A_TIME = Integer.MIN_VALUE; private final SQLExecuteTemplate sqlExecuteTemplate; public MemoryStrictlyExecuteEngine(final BackendConnection backendConnection, final JDBCExecutorWrapper jdbcExecutorWrapper) { super(backendConnection, jdbcExecutorWrapper); sqlExecuteTemplate = new SQLExecuteTemplate(BackendExecutorContext.getInstance().getExecuteEngine(), ConnectionMode.MEMORY_STRICTLY); } @Override Loading @@ -68,7 +73,7 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { SQLType sqlType = routeResult.getSqlStatement().getType(); boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); Collection<ExecuteResponseUnit> executeResponseUnits = BackendExecutorContext.getInstance().getExecuteEngine().execute(statementExecuteUnits, Collection<ExecuteResponseUnit> executeResponseUnits = sqlExecuteTemplate.execute(statementExecuteUnits, new FirstMemoryStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys), new MemoryStrictlySQLExecuteCallback(sqlType, isExceptionThrown, dataMap, isReturnGeneratedKeys)); ExecuteResponseUnit firstExecuteResponseUnit = executeResponseUnits.iterator().next(); Loading @@ -88,11 +93,6 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { return new ExecuteUpdateResponse(executeResponseUnits); } @Override protected void setFetchSize(final Statement statement) throws SQLException { statement.setFetchSize(FETCH_ONE_ROW_A_TIME); } @Override protected QueryResult createQueryResult(final ResultSet resultSet) { return new StreamQueryResult(resultSet); Loading @@ -109,6 +109,7 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { @Override protected ExecuteResponseUnit executeSQL(final StatementExecuteUnit executeUnit) throws SQLException { executeUnit.getStatement().setFetchSize(FETCH_ONE_ROW_A_TIME); return executeWithMetadata(executeUnit.getStatement(), executeUnit.getSqlExecutionUnit().getSqlUnit().getSql(), isReturnGeneratedKeys); } } Loading @@ -124,6 +125,7 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { @Override protected ExecuteResponseUnit executeSQL(final StatementExecuteUnit executeUnit) throws SQLException { executeUnit.getStatement().setFetchSize(FETCH_ONE_ROW_A_TIME); return executeWithoutMetadata(executeUnit.getStatement(), executeUnit.getSqlExecutionUnit().getSqlUnit().getSql(), isReturnGeneratedKeys); } } Loading