Loading sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/ShardingDataSource.java +9 −10 Original line number Diff line number Diff line Loading @@ -31,7 +31,7 @@ import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingContext; import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDataSourceAdapter; import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext; import com.dangdang.ddframe.rdb.sharding.router.SQLRouteEngine; import com.dangdang.ddframe.rdb.sharding.threadlocal.ThreadLocalObjectRepository; import com.dangdang.ddframe.rdb.sharding.metrics.ThreadLocalObjectContainer; import com.google.common.base.Preconditions; /** Loading @@ -41,7 +41,7 @@ import com.google.common.base.Preconditions; */ public class ShardingDataSource extends AbstractDataSourceAdapter { private final ThreadLocalObjectRepository threadLocalObjectRepository = new ThreadLocalObjectRepository(); private final ThreadLocalObjectContainer threadLocalObjectContainer = new ThreadLocalObjectContainer(); private final ShardingContext context; Loading @@ -53,27 +53,26 @@ public class ShardingDataSource extends AbstractDataSourceAdapter { Preconditions.checkNotNull(shardingRule); Preconditions.checkNotNull(props); ShardingConfiguration configuration = new ShardingConfiguration(props); initThreadLocalObjectRepository(configuration); initThreadLocalObjectContainer(configuration); DatabaseType type; try { type = DatabaseType.valueFrom(ShardingConnection.getDatabaseMetaDataFromDataSource(shardingRule.getDataSourceRule().getDataSources()).getDatabaseProductName()); } catch (final SQLException e) { throw new ShardingJdbcException("Can not get database product name", e); } catch (final SQLException ex) { throw new ShardingJdbcException("Can not get database product name", ex); } context = new ShardingContext(shardingRule, new SQLRouteEngine(shardingRule, type), new ExecutorEngine(configuration)); } private void initThreadLocalObjectRepository(final ShardingConfiguration configuration) { boolean enableMetrics = configuration.getConfig(ShardingConfigurationConstant.METRICS_ENABLE, boolean.class); if (enableMetrics) { threadLocalObjectRepository.addItem(new MetricsContext(configuration.getConfig(ShardingConfigurationConstant.METRICS_SECOND_PERIOD, long.class), private void initThreadLocalObjectContainer(final ShardingConfiguration configuration) { if (configuration.getConfig(ShardingConfigurationConstant.METRICS_ENABLE, boolean.class)) { threadLocalObjectContainer.initItem(new MetricsContext(configuration.getConfig(ShardingConfigurationConstant.METRICS_SECOND_PERIOD, long.class), configuration.getConfig(ShardingConfigurationConstant.METRICS_PACKAGE_NAME, String.class))); } } @Override public ShardingConnection getConnection() throws SQLException { threadLocalObjectRepository.open(); threadLocalObjectContainer.build(); return new ShardingConnection(context); } Loading sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/config/ShardingConfigurationConstant.java +2 −2 Original line number Diff line number Diff line Loading @@ -57,7 +57,7 @@ public enum ShardingConfigurationConstant { /** * 最大工作现成数量. */ PARALLEL_EXECUTOR_WORKER_MAX_SIZE("parallelExecutor.worker.maxSize", defaultMaxThread()), PARALLEL_EXECUTOR_WORKER_MAX_SIZE("parallelExecutor.worker.maxSize", defaultMaxThreads()), /** * 工作线程空闲时超时时间. Loading @@ -73,7 +73,7 @@ public enum ShardingConfigurationConstant { private final String defaultValue; private static String defaultMaxThread() { private static String defaultMaxThreads() { return String.valueOf(Runtime.getRuntime().availableProcessors() * 2); } } sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingConnection.java +15 −16 Original line number Diff line number Diff line Loading @@ -41,13 +41,14 @@ import lombok.RequiredArgsConstructor; /** * 支持分片的数据库连接. * * @author zhangliang,gaohongtao * @author zhangliang * @author gaohongtao */ @RequiredArgsConstructor public final class ShardingConnection extends AbstractConnectionAdapter { @Getter(AccessLevel.PACKAGE) private final ShardingContext context; private final ShardingContext shardingContext; private Map<String, Connection> connectionMap = new HashMap<>(); Loading @@ -61,9 +62,9 @@ public final class ShardingConnection extends AbstractConnectionAdapter { if (connectionMap.containsKey(dataSourceName)) { return connectionMap.get(dataSourceName); } Context context = MetricsContext.start("ShardingConnection-getConnection", dataSourceName); Connection connection = this.context.getShardingRule().getDataSourceRule().getDataSource(dataSourceName).getConnection(); MetricsContext.stop(context); Context metricsContext = MetricsContext.start("ShardingConnection-getConnection", dataSourceName); Connection connection = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName).getConnection(); MetricsContext.stop(metricsContext); replayMethodsInvovation(connection); connectionMap.put(dataSourceName, connection); return connection; Loading @@ -72,31 +73,29 @@ public final class ShardingConnection extends AbstractConnectionAdapter { @Override public DatabaseMetaData getMetaData() throws SQLException { if (connectionMap.isEmpty()) { return getDatabaseMetaDataFromDataSource(context.getShardingRule().getDataSourceRule().getDataSources()); } else { return getDatabaseMetaDataFromConnection(connectionMap.values()); return getDatabaseMetaDataFromDataSource(shardingContext.getShardingRule().getDataSourceRule().getDataSources()); } return getDatabaseMetaDataFromConnection(connectionMap.values()); } public static DatabaseMetaData getDatabaseMetaDataFromDataSource(final Collection<DataSource> dataSources) { Collection<Connection> connectionCollection = null; Collection<Connection> connections = null; try { connectionCollection = Collections2.transform(dataSources, new Function<DataSource, Connection>() { connections = Collections2.transform(dataSources, new Function<DataSource, Connection>() { @Override public Connection apply(final DataSource input) { try { return input.getConnection(); } catch (final SQLException e) { throw new ShardingJdbcException(e); } catch (final SQLException ex) { throw new ShardingJdbcException(ex); } } }); return getDatabaseMetaDataFromConnection(connectionCollection); return getDatabaseMetaDataFromConnection(connections); } finally { if (null != connectionCollection) { for (Connection each : connectionCollection) { if (null != connections) { for (Connection each : connections) { try { each.close(); } catch (final SQLException ignored) { Loading sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java +6 −6 Original line number Diff line number Diff line Loading @@ -90,7 +90,7 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd @Override public ResultSet executeQuery() throws SQLException { hasExecuted = true; setCurrentResultSet(ResultSetFactory.getResultSet(new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), setCurrentResultSet(ResultSetFactory.getResultSet(new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), getRoutedPreparedStatements()).executeQuery(), getMergeContext())); return getCurrentResultSet(); } Loading @@ -98,13 +98,13 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd @Override public int executeUpdate() throws SQLException { hasExecuted = true; return new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), getRoutedPreparedStatements()).executeUpdate(); return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), getRoutedPreparedStatements()).executeUpdate(); } @Override public boolean execute() throws SQLException { hasExecuted = true; return new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), getRoutedPreparedStatements()).execute(); return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), getRoutedPreparedStatements()).execute(); } @Override Loading @@ -126,7 +126,7 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd for (List<Object> each : batchParameters) { List<PreparedStatement> routePreparedStatements = routeSQL(each); cachedRoutedPreparedStatements.addAll(routePreparedStatements); result[i++] = new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), routePreparedStatements).executeUpdate(); result[i++] = new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routePreparedStatements).executeUpdate(); } return result; } Loading @@ -153,9 +153,9 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd private List<PreparedStatement> routeSQL(final List<Object> parameters) throws SQLException { List<PreparedStatement> result = new ArrayList<>(); SQLRouteResult sqlRouteResult = getShardingConnection().getContext().getSqlRouteEngine().route(sql, parameters); SQLRouteResult sqlRouteResult = getShardingConnection().getShardingContext().getSqlRouteEngine().route(sql, parameters); MergeContext mergeContext = sqlRouteResult.getMergeContext(); mergeContext.setExecutorEngine(getShardingConnection().getContext().getExecutorEngine()); mergeContext.setExecutorEngine(getShardingConnection().getShardingContext().getExecutorEngine()); setMergeContext(mergeContext); for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) { PreparedStatement preparedStatement = generatePrepareStatement(getShardingConnection().getConnection(each.getDataSource()), each.getSql()); Loading sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java +4 −5 Original line number Diff line number Diff line Loading @@ -78,8 +78,7 @@ public class ShardingStatement extends AbstractStatementAdapter { this(shardingConnection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT); } public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { super(Statement.class); this.shardingConnection = shardingConnection; this.resultSetType = resultSetType; Loading Loading @@ -142,10 +141,10 @@ public class ShardingStatement extends AbstractStatementAdapter { } private StatementExecutor generateExecutor(final String sql) throws SQLException { StatementExecutor result = new StatementExecutor(shardingConnection.getContext().getExecutorEngine()); SQLRouteResult sqlRouteResult = shardingConnection.getContext().getSqlRouteEngine().route(sql, Collections.emptyList()); StatementExecutor result = new StatementExecutor(shardingConnection.getShardingContext().getExecutorEngine()); SQLRouteResult sqlRouteResult = shardingConnection.getShardingContext().getSqlRouteEngine().route(sql, Collections.emptyList()); mergeContext = sqlRouteResult.getMergeContext(); mergeContext.setExecutorEngine(shardingConnection.getContext().getExecutorEngine()); mergeContext.setExecutorEngine(shardingConnection.getShardingContext().getExecutorEngine()); for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) { result.addStatement(each.getSql(), generateStatement(each.getSql(), each.getDataSource())); } Loading Loading
sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/ShardingDataSource.java +9 −10 Original line number Diff line number Diff line Loading @@ -31,7 +31,7 @@ import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingContext; import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDataSourceAdapter; import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext; import com.dangdang.ddframe.rdb.sharding.router.SQLRouteEngine; import com.dangdang.ddframe.rdb.sharding.threadlocal.ThreadLocalObjectRepository; import com.dangdang.ddframe.rdb.sharding.metrics.ThreadLocalObjectContainer; import com.google.common.base.Preconditions; /** Loading @@ -41,7 +41,7 @@ import com.google.common.base.Preconditions; */ public class ShardingDataSource extends AbstractDataSourceAdapter { private final ThreadLocalObjectRepository threadLocalObjectRepository = new ThreadLocalObjectRepository(); private final ThreadLocalObjectContainer threadLocalObjectContainer = new ThreadLocalObjectContainer(); private final ShardingContext context; Loading @@ -53,27 +53,26 @@ public class ShardingDataSource extends AbstractDataSourceAdapter { Preconditions.checkNotNull(shardingRule); Preconditions.checkNotNull(props); ShardingConfiguration configuration = new ShardingConfiguration(props); initThreadLocalObjectRepository(configuration); initThreadLocalObjectContainer(configuration); DatabaseType type; try { type = DatabaseType.valueFrom(ShardingConnection.getDatabaseMetaDataFromDataSource(shardingRule.getDataSourceRule().getDataSources()).getDatabaseProductName()); } catch (final SQLException e) { throw new ShardingJdbcException("Can not get database product name", e); } catch (final SQLException ex) { throw new ShardingJdbcException("Can not get database product name", ex); } context = new ShardingContext(shardingRule, new SQLRouteEngine(shardingRule, type), new ExecutorEngine(configuration)); } private void initThreadLocalObjectRepository(final ShardingConfiguration configuration) { boolean enableMetrics = configuration.getConfig(ShardingConfigurationConstant.METRICS_ENABLE, boolean.class); if (enableMetrics) { threadLocalObjectRepository.addItem(new MetricsContext(configuration.getConfig(ShardingConfigurationConstant.METRICS_SECOND_PERIOD, long.class), private void initThreadLocalObjectContainer(final ShardingConfiguration configuration) { if (configuration.getConfig(ShardingConfigurationConstant.METRICS_ENABLE, boolean.class)) { threadLocalObjectContainer.initItem(new MetricsContext(configuration.getConfig(ShardingConfigurationConstant.METRICS_SECOND_PERIOD, long.class), configuration.getConfig(ShardingConfigurationConstant.METRICS_PACKAGE_NAME, String.class))); } } @Override public ShardingConnection getConnection() throws SQLException { threadLocalObjectRepository.open(); threadLocalObjectContainer.build(); return new ShardingConnection(context); } Loading
sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/config/ShardingConfigurationConstant.java +2 −2 Original line number Diff line number Diff line Loading @@ -57,7 +57,7 @@ public enum ShardingConfigurationConstant { /** * 最大工作现成数量. */ PARALLEL_EXECUTOR_WORKER_MAX_SIZE("parallelExecutor.worker.maxSize", defaultMaxThread()), PARALLEL_EXECUTOR_WORKER_MAX_SIZE("parallelExecutor.worker.maxSize", defaultMaxThreads()), /** * 工作线程空闲时超时时间. Loading @@ -73,7 +73,7 @@ public enum ShardingConfigurationConstant { private final String defaultValue; private static String defaultMaxThread() { private static String defaultMaxThreads() { return String.valueOf(Runtime.getRuntime().availableProcessors() * 2); } }
sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingConnection.java +15 −16 Original line number Diff line number Diff line Loading @@ -41,13 +41,14 @@ import lombok.RequiredArgsConstructor; /** * 支持分片的数据库连接. * * @author zhangliang,gaohongtao * @author zhangliang * @author gaohongtao */ @RequiredArgsConstructor public final class ShardingConnection extends AbstractConnectionAdapter { @Getter(AccessLevel.PACKAGE) private final ShardingContext context; private final ShardingContext shardingContext; private Map<String, Connection> connectionMap = new HashMap<>(); Loading @@ -61,9 +62,9 @@ public final class ShardingConnection extends AbstractConnectionAdapter { if (connectionMap.containsKey(dataSourceName)) { return connectionMap.get(dataSourceName); } Context context = MetricsContext.start("ShardingConnection-getConnection", dataSourceName); Connection connection = this.context.getShardingRule().getDataSourceRule().getDataSource(dataSourceName).getConnection(); MetricsContext.stop(context); Context metricsContext = MetricsContext.start("ShardingConnection-getConnection", dataSourceName); Connection connection = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName).getConnection(); MetricsContext.stop(metricsContext); replayMethodsInvovation(connection); connectionMap.put(dataSourceName, connection); return connection; Loading @@ -72,31 +73,29 @@ public final class ShardingConnection extends AbstractConnectionAdapter { @Override public DatabaseMetaData getMetaData() throws SQLException { if (connectionMap.isEmpty()) { return getDatabaseMetaDataFromDataSource(context.getShardingRule().getDataSourceRule().getDataSources()); } else { return getDatabaseMetaDataFromConnection(connectionMap.values()); return getDatabaseMetaDataFromDataSource(shardingContext.getShardingRule().getDataSourceRule().getDataSources()); } return getDatabaseMetaDataFromConnection(connectionMap.values()); } public static DatabaseMetaData getDatabaseMetaDataFromDataSource(final Collection<DataSource> dataSources) { Collection<Connection> connectionCollection = null; Collection<Connection> connections = null; try { connectionCollection = Collections2.transform(dataSources, new Function<DataSource, Connection>() { connections = Collections2.transform(dataSources, new Function<DataSource, Connection>() { @Override public Connection apply(final DataSource input) { try { return input.getConnection(); } catch (final SQLException e) { throw new ShardingJdbcException(e); } catch (final SQLException ex) { throw new ShardingJdbcException(ex); } } }); return getDatabaseMetaDataFromConnection(connectionCollection); return getDatabaseMetaDataFromConnection(connections); } finally { if (null != connectionCollection) { for (Connection each : connectionCollection) { if (null != connections) { for (Connection each : connections) { try { each.close(); } catch (final SQLException ignored) { Loading
sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java +6 −6 Original line number Diff line number Diff line Loading @@ -90,7 +90,7 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd @Override public ResultSet executeQuery() throws SQLException { hasExecuted = true; setCurrentResultSet(ResultSetFactory.getResultSet(new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), setCurrentResultSet(ResultSetFactory.getResultSet(new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), getRoutedPreparedStatements()).executeQuery(), getMergeContext())); return getCurrentResultSet(); } Loading @@ -98,13 +98,13 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd @Override public int executeUpdate() throws SQLException { hasExecuted = true; return new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), getRoutedPreparedStatements()).executeUpdate(); return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), getRoutedPreparedStatements()).executeUpdate(); } @Override public boolean execute() throws SQLException { hasExecuted = true; return new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), getRoutedPreparedStatements()).execute(); return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), getRoutedPreparedStatements()).execute(); } @Override Loading @@ -126,7 +126,7 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd for (List<Object> each : batchParameters) { List<PreparedStatement> routePreparedStatements = routeSQL(each); cachedRoutedPreparedStatements.addAll(routePreparedStatements); result[i++] = new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), routePreparedStatements).executeUpdate(); result[i++] = new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routePreparedStatements).executeUpdate(); } return result; } Loading @@ -153,9 +153,9 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd private List<PreparedStatement> routeSQL(final List<Object> parameters) throws SQLException { List<PreparedStatement> result = new ArrayList<>(); SQLRouteResult sqlRouteResult = getShardingConnection().getContext().getSqlRouteEngine().route(sql, parameters); SQLRouteResult sqlRouteResult = getShardingConnection().getShardingContext().getSqlRouteEngine().route(sql, parameters); MergeContext mergeContext = sqlRouteResult.getMergeContext(); mergeContext.setExecutorEngine(getShardingConnection().getContext().getExecutorEngine()); mergeContext.setExecutorEngine(getShardingConnection().getShardingContext().getExecutorEngine()); setMergeContext(mergeContext); for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) { PreparedStatement preparedStatement = generatePrepareStatement(getShardingConnection().getConnection(each.getDataSource()), each.getSql()); Loading
sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java +4 −5 Original line number Diff line number Diff line Loading @@ -78,8 +78,7 @@ public class ShardingStatement extends AbstractStatementAdapter { this(shardingConnection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT); } public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { super(Statement.class); this.shardingConnection = shardingConnection; this.resultSetType = resultSetType; Loading Loading @@ -142,10 +141,10 @@ public class ShardingStatement extends AbstractStatementAdapter { } private StatementExecutor generateExecutor(final String sql) throws SQLException { StatementExecutor result = new StatementExecutor(shardingConnection.getContext().getExecutorEngine()); SQLRouteResult sqlRouteResult = shardingConnection.getContext().getSqlRouteEngine().route(sql, Collections.emptyList()); StatementExecutor result = new StatementExecutor(shardingConnection.getShardingContext().getExecutorEngine()); SQLRouteResult sqlRouteResult = shardingConnection.getShardingContext().getSqlRouteEngine().route(sql, Collections.emptyList()); mergeContext = sqlRouteResult.getMergeContext(); mergeContext.setExecutorEngine(shardingConnection.getContext().getExecutorEngine()); mergeContext.setExecutorEngine(shardingConnection.getShardingContext().getExecutorEngine()); for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) { result.addStatement(each.getSql(), generateStatement(each.getSql(), each.getDataSource())); } Loading