Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCBackendHandler.java +5 −16 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.QueryRespo import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket; import io.shardingsphere.transaction.xa.AtomikosUserTransaction; import lombok.RequiredArgsConstructor; import javax.transaction.Status; import javax.transaction.SystemException; Loading @@ -54,29 +55,21 @@ import java.util.List; * @author zhaojun * @author zhangliang */ @RequiredArgsConstructor public final class JDBCBackendHandler implements BackendHandler { private final String sql; private final RuleRegistry ruleRegistry; private final BackendConnection backendConnection; private final JDBCExecuteEngine executeEngine; private final RuleRegistry ruleRegistry = RuleRegistry.getInstance(); private ExecuteResponse executeResponse; private MergedResult mergedResult; private int currentSequenceId; public JDBCBackendHandler(final String sql, final JDBCExecuteEngine executeEngine) { this.sql = sql; this.executeEngine = executeEngine; ruleRegistry = RuleRegistry.getInstance(); backendConnection = executeEngine.getBackendConnection(); } @Override public CommandResponsePackets execute() { try { Loading Loading @@ -141,11 +134,7 @@ public final class JDBCBackendHandler implements BackendHandler { @Override public boolean next() throws SQLException { if (null == mergedResult || !mergedResult.next()) { backendConnection.close(); return false; } return true; return null != mergedResult && mergedResult.next(); } @Override Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/execute/JDBCExecuteEngine.java +0 −3 Original line number Diff line number Diff line Loading @@ -19,7 +19,6 @@ package io.shardingsphere.proxy.backend.common.jdbc.execute; import io.shardingsphere.core.merger.QueryResult; import io.shardingsphere.proxy.backend.common.SQLExecuteEngine; import io.shardingsphere.proxy.backend.common.jdbc.BackendConnection; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteQueryResponseUnit; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteResponseUnit; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteUpdateResponseUnit; Loading Loading @@ -57,8 +56,6 @@ public abstract class JDBCExecuteEngine implements SQLExecuteEngine { private final List<QueryResult> queryResults = new LinkedList<>(); private final BackendConnection backendConnection = new BackendConnection(); private final ExecutorService executorService = ExecutorContext.getInstance().getExecutorService(); private final JDBCExecutorWrapper jdbcExecutorWrapper; Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/execute/JDBCExecuteEngineFactory.java +9 −4 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ package io.shardingsphere.proxy.backend.common.jdbc.execute; import io.shardingsphere.proxy.backend.common.ProxyMode; import io.shardingsphere.proxy.backend.common.jdbc.BackendConnection; import io.shardingsphere.proxy.backend.common.jdbc.execute.memory.ConnectionStrictlyExecuteEngine; import io.shardingsphere.proxy.backend.common.jdbc.execute.stream.MemoryStrictlyExecuteEngine; import io.shardingsphere.proxy.backend.common.jdbc.wrapper.JDBCExecutorWrapper; Loading @@ -41,21 +42,25 @@ public final class JDBCExecuteEngineFactory { /** * Create instance for text protocol. * * @param backendConnection backend connection * @return instance for text protocol */ public static JDBCExecuteEngine createTextProtocolInstance() { public static JDBCExecuteEngine createTextProtocolInstance(final BackendConnection backendConnection) { JDBCExecutorWrapper jdbcExecutorWrapper = new StatementExecutorWrapper(); return ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode() ? new MemoryStrictlyExecuteEngine(jdbcExecutorWrapper) : new ConnectionStrictlyExecuteEngine(jdbcExecutorWrapper); return ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode() ? new MemoryStrictlyExecuteEngine(backendConnection, jdbcExecutorWrapper) : new ConnectionStrictlyExecuteEngine(backendConnection, jdbcExecutorWrapper); } /** * Create instance for statement protocol. * * @param preparedStatementParameters parameters of prepared statement * @param backendConnection backend connection * @return instance for statement protocol */ public static JDBCExecuteEngine createStatementProtocolInstance(final List<PreparedStatementParameter> preparedStatementParameters) { public static JDBCExecuteEngine createStatementProtocolInstance(final List<PreparedStatementParameter> preparedStatementParameters, final BackendConnection backendConnection) { JDBCExecutorWrapper jdbcExecutorWrapper = new PreparedStatementExecutorWrapper(preparedStatementParameters); return ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode() ? new MemoryStrictlyExecuteEngine(jdbcExecutorWrapper) : new ConnectionStrictlyExecuteEngine(jdbcExecutorWrapper); return ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode() ? new MemoryStrictlyExecuteEngine(backendConnection, jdbcExecutorWrapper) : new ConnectionStrictlyExecuteEngine(backendConnection, jdbcExecutorWrapper); } } sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/execute/memory/ConnectionStrictlyExecuteEngine.java +7 −3 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ import io.shardingsphere.core.exception.ShardingException; import io.shardingsphere.core.merger.QueryResult; import io.shardingsphere.core.routing.SQLRouteResult; import io.shardingsphere.core.routing.SQLUnit; import io.shardingsphere.proxy.backend.common.jdbc.BackendConnection; import io.shardingsphere.proxy.backend.common.jdbc.execute.JDBCExecuteEngine; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteQueryResponse; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteResponse; Loading Loading @@ -51,8 +52,11 @@ import java.util.concurrent.Future; */ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine { public ConnectionStrictlyExecuteEngine(final JDBCExecutorWrapper jdbcExecutorWrapper) { private final BackendConnection backendConnection; public ConnectionStrictlyExecuteEngine(final BackendConnection backendConnection, final JDBCExecutorWrapper jdbcExecutorWrapper) { super(jdbcExecutorWrapper); this.backendConnection = backendConnection; } @Override Loading @@ -68,7 +72,7 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine { private List<Future<Collection<ExecuteResponseUnit>>> asyncExecute(final boolean isReturnGeneratedKeys, final Map<String, Collection<SQLUnit>> sqlUnitGroups) throws SQLException { List<Future<Collection<ExecuteResponseUnit>>> result = new LinkedList<>(); for (Entry<String, Collection<SQLUnit>> entry : sqlUnitGroups.entrySet()) { final Connection connection = getBackendConnection().getConnection(entry.getKey()); final Connection connection = backendConnection.getConnection(entry.getKey()); final Collection<SQLUnit> sqlUnits = entry.getValue(); result.add(getExecutorService().submit(new Callable<Collection<ExecuteResponseUnit>>() { Loading @@ -89,7 +93,7 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine { private Collection<ExecuteResponseUnit> syncExecute(final boolean isReturnGeneratedKeys, final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws SQLException { Collection<ExecuteResponseUnit> result = new LinkedList<>(); boolean hasMetaData = false; Connection connection = getBackendConnection().getConnection(dataSourceName); Connection connection = backendConnection.getConnection(dataSourceName); for (SQLUnit each : sqlUnits) { String actualSQL = each.getSql(); Statement statement = getJdbcExecutorWrapper().createStatement(connection, actualSQL, isReturnGeneratedKeys); Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/execute/stream/MemoryStrictlyExecuteEngine.java +7 −3 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ import io.shardingsphere.core.exception.ShardingException; import io.shardingsphere.core.merger.QueryResult; import io.shardingsphere.core.routing.SQLExecutionUnit; import io.shardingsphere.core.routing.SQLRouteResult; import io.shardingsphere.proxy.backend.common.jdbc.BackendConnection; import io.shardingsphere.proxy.backend.common.jdbc.execute.JDBCExecuteEngine; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteQueryResponse; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteResponse; Loading Loading @@ -52,8 +53,11 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { private static final Integer FETCH_ONE_ROW_A_TIME = Integer.MIN_VALUE; public MemoryStrictlyExecuteEngine(final JDBCExecutorWrapper jdbcExecutorWrapper) { private final BackendConnection backendConnection; public MemoryStrictlyExecuteEngine(final BackendConnection backendConnection, final JDBCExecutorWrapper jdbcExecutorWrapper) { super(jdbcExecutorWrapper); this.backendConnection = backendConnection; } @Override Loading @@ -75,7 +79,7 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { @Override public ExecuteResponseUnit call() throws SQLException { Statement statement = getJdbcExecutorWrapper().createStatement(getBackendConnection().getConnection(dataSourceName), actualSQL, isReturnGeneratedKeys); Statement statement = getJdbcExecutorWrapper().createStatement(backendConnection.getConnection(dataSourceName), actualSQL, isReturnGeneratedKeys); return executeWithoutMetadata(statement, actualSQL, isReturnGeneratedKeys); } })); Loading @@ -85,7 +89,7 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { private ExecuteResponseUnit syncExecute(final boolean isReturnGeneratedKeys, final SQLExecutionUnit sqlExecutionUnit) throws SQLException { Statement statement = getJdbcExecutorWrapper().createStatement( getBackendConnection().getConnection(sqlExecutionUnit.getDataSource()), sqlExecutionUnit.getSqlUnit().getSql(), isReturnGeneratedKeys); backendConnection.getConnection(sqlExecutionUnit.getDataSource()), sqlExecutionUnit.getSqlUnit().getSql(), isReturnGeneratedKeys); return executeWithMetadata(statement, sqlExecutionUnit.getSqlUnit().getSql(), isReturnGeneratedKeys); } Loading Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCBackendHandler.java +5 −16 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.QueryRespo import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket; import io.shardingsphere.transaction.xa.AtomikosUserTransaction; import lombok.RequiredArgsConstructor; import javax.transaction.Status; import javax.transaction.SystemException; Loading @@ -54,29 +55,21 @@ import java.util.List; * @author zhaojun * @author zhangliang */ @RequiredArgsConstructor public final class JDBCBackendHandler implements BackendHandler { private final String sql; private final RuleRegistry ruleRegistry; private final BackendConnection backendConnection; private final JDBCExecuteEngine executeEngine; private final RuleRegistry ruleRegistry = RuleRegistry.getInstance(); private ExecuteResponse executeResponse; private MergedResult mergedResult; private int currentSequenceId; public JDBCBackendHandler(final String sql, final JDBCExecuteEngine executeEngine) { this.sql = sql; this.executeEngine = executeEngine; ruleRegistry = RuleRegistry.getInstance(); backendConnection = executeEngine.getBackendConnection(); } @Override public CommandResponsePackets execute() { try { Loading Loading @@ -141,11 +134,7 @@ public final class JDBCBackendHandler implements BackendHandler { @Override public boolean next() throws SQLException { if (null == mergedResult || !mergedResult.next()) { backendConnection.close(); return false; } return true; return null != mergedResult && mergedResult.next(); } @Override Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/execute/JDBCExecuteEngine.java +0 −3 Original line number Diff line number Diff line Loading @@ -19,7 +19,6 @@ package io.shardingsphere.proxy.backend.common.jdbc.execute; import io.shardingsphere.core.merger.QueryResult; import io.shardingsphere.proxy.backend.common.SQLExecuteEngine; import io.shardingsphere.proxy.backend.common.jdbc.BackendConnection; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteQueryResponseUnit; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteResponseUnit; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteUpdateResponseUnit; Loading Loading @@ -57,8 +56,6 @@ public abstract class JDBCExecuteEngine implements SQLExecuteEngine { private final List<QueryResult> queryResults = new LinkedList<>(); private final BackendConnection backendConnection = new BackendConnection(); private final ExecutorService executorService = ExecutorContext.getInstance().getExecutorService(); private final JDBCExecutorWrapper jdbcExecutorWrapper; Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/execute/JDBCExecuteEngineFactory.java +9 −4 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ package io.shardingsphere.proxy.backend.common.jdbc.execute; import io.shardingsphere.proxy.backend.common.ProxyMode; import io.shardingsphere.proxy.backend.common.jdbc.BackendConnection; import io.shardingsphere.proxy.backend.common.jdbc.execute.memory.ConnectionStrictlyExecuteEngine; import io.shardingsphere.proxy.backend.common.jdbc.execute.stream.MemoryStrictlyExecuteEngine; import io.shardingsphere.proxy.backend.common.jdbc.wrapper.JDBCExecutorWrapper; Loading @@ -41,21 +42,25 @@ public final class JDBCExecuteEngineFactory { /** * Create instance for text protocol. * * @param backendConnection backend connection * @return instance for text protocol */ public static JDBCExecuteEngine createTextProtocolInstance() { public static JDBCExecuteEngine createTextProtocolInstance(final BackendConnection backendConnection) { JDBCExecutorWrapper jdbcExecutorWrapper = new StatementExecutorWrapper(); return ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode() ? new MemoryStrictlyExecuteEngine(jdbcExecutorWrapper) : new ConnectionStrictlyExecuteEngine(jdbcExecutorWrapper); return ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode() ? new MemoryStrictlyExecuteEngine(backendConnection, jdbcExecutorWrapper) : new ConnectionStrictlyExecuteEngine(backendConnection, jdbcExecutorWrapper); } /** * Create instance for statement protocol. * * @param preparedStatementParameters parameters of prepared statement * @param backendConnection backend connection * @return instance for statement protocol */ public static JDBCExecuteEngine createStatementProtocolInstance(final List<PreparedStatementParameter> preparedStatementParameters) { public static JDBCExecuteEngine createStatementProtocolInstance(final List<PreparedStatementParameter> preparedStatementParameters, final BackendConnection backendConnection) { JDBCExecutorWrapper jdbcExecutorWrapper = new PreparedStatementExecutorWrapper(preparedStatementParameters); return ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode() ? new MemoryStrictlyExecuteEngine(jdbcExecutorWrapper) : new ConnectionStrictlyExecuteEngine(jdbcExecutorWrapper); return ProxyMode.MEMORY_STRICTLY == RuleRegistry.getInstance().getProxyMode() ? new MemoryStrictlyExecuteEngine(backendConnection, jdbcExecutorWrapper) : new ConnectionStrictlyExecuteEngine(backendConnection, jdbcExecutorWrapper); } }
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/execute/memory/ConnectionStrictlyExecuteEngine.java +7 −3 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ import io.shardingsphere.core.exception.ShardingException; import io.shardingsphere.core.merger.QueryResult; import io.shardingsphere.core.routing.SQLRouteResult; import io.shardingsphere.core.routing.SQLUnit; import io.shardingsphere.proxy.backend.common.jdbc.BackendConnection; import io.shardingsphere.proxy.backend.common.jdbc.execute.JDBCExecuteEngine; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteQueryResponse; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteResponse; Loading Loading @@ -51,8 +52,11 @@ import java.util.concurrent.Future; */ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine { public ConnectionStrictlyExecuteEngine(final JDBCExecutorWrapper jdbcExecutorWrapper) { private final BackendConnection backendConnection; public ConnectionStrictlyExecuteEngine(final BackendConnection backendConnection, final JDBCExecutorWrapper jdbcExecutorWrapper) { super(jdbcExecutorWrapper); this.backendConnection = backendConnection; } @Override Loading @@ -68,7 +72,7 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine { private List<Future<Collection<ExecuteResponseUnit>>> asyncExecute(final boolean isReturnGeneratedKeys, final Map<String, Collection<SQLUnit>> sqlUnitGroups) throws SQLException { List<Future<Collection<ExecuteResponseUnit>>> result = new LinkedList<>(); for (Entry<String, Collection<SQLUnit>> entry : sqlUnitGroups.entrySet()) { final Connection connection = getBackendConnection().getConnection(entry.getKey()); final Connection connection = backendConnection.getConnection(entry.getKey()); final Collection<SQLUnit> sqlUnits = entry.getValue(); result.add(getExecutorService().submit(new Callable<Collection<ExecuteResponseUnit>>() { Loading @@ -89,7 +93,7 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine { private Collection<ExecuteResponseUnit> syncExecute(final boolean isReturnGeneratedKeys, final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws SQLException { Collection<ExecuteResponseUnit> result = new LinkedList<>(); boolean hasMetaData = false; Connection connection = getBackendConnection().getConnection(dataSourceName); Connection connection = backendConnection.getConnection(dataSourceName); for (SQLUnit each : sqlUnits) { String actualSQL = each.getSql(); Statement statement = getJdbcExecutorWrapper().createStatement(connection, actualSQL, isReturnGeneratedKeys); Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/execute/stream/MemoryStrictlyExecuteEngine.java +7 −3 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ import io.shardingsphere.core.exception.ShardingException; import io.shardingsphere.core.merger.QueryResult; import io.shardingsphere.core.routing.SQLExecutionUnit; import io.shardingsphere.core.routing.SQLRouteResult; import io.shardingsphere.proxy.backend.common.jdbc.BackendConnection; import io.shardingsphere.proxy.backend.common.jdbc.execute.JDBCExecuteEngine; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteQueryResponse; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteResponse; Loading Loading @@ -52,8 +53,11 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { private static final Integer FETCH_ONE_ROW_A_TIME = Integer.MIN_VALUE; public MemoryStrictlyExecuteEngine(final JDBCExecutorWrapper jdbcExecutorWrapper) { private final BackendConnection backendConnection; public MemoryStrictlyExecuteEngine(final BackendConnection backendConnection, final JDBCExecutorWrapper jdbcExecutorWrapper) { super(jdbcExecutorWrapper); this.backendConnection = backendConnection; } @Override Loading @@ -75,7 +79,7 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { @Override public ExecuteResponseUnit call() throws SQLException { Statement statement = getJdbcExecutorWrapper().createStatement(getBackendConnection().getConnection(dataSourceName), actualSQL, isReturnGeneratedKeys); Statement statement = getJdbcExecutorWrapper().createStatement(backendConnection.getConnection(dataSourceName), actualSQL, isReturnGeneratedKeys); return executeWithoutMetadata(statement, actualSQL, isReturnGeneratedKeys); } })); Loading @@ -85,7 +89,7 @@ public final class MemoryStrictlyExecuteEngine extends JDBCExecuteEngine { private ExecuteResponseUnit syncExecute(final boolean isReturnGeneratedKeys, final SQLExecutionUnit sqlExecutionUnit) throws SQLException { Statement statement = getJdbcExecutorWrapper().createStatement( getBackendConnection().getConnection(sqlExecutionUnit.getDataSource()), sqlExecutionUnit.getSqlUnit().getSql(), isReturnGeneratedKeys); backendConnection.getConnection(sqlExecutionUnit.getDataSource()), sqlExecutionUnit.getSqlUnit().getSql(), isReturnGeneratedKeys); return executeWithMetadata(statement, sqlExecutionUnit.getSqlUnit().getSql(), isReturnGeneratedKeys); } Loading