Loading sharding-core/src/main/java/io/shardingsphere/core/constant/ShardingPropertiesConstant.java +5 −0 Original line number Diff line number Diff line Loading @@ -69,6 +69,11 @@ public enum ShardingPropertiesConstant { PROXY_TRANSACTION_MODE("proxy.transaction.mode", "NONE", String.class), /** * Thread pool size of connect database for Sharding-Proxy. * * <p>Cannot change dynamically, change this value should restart proxy.</p> */ PROXY_MAX_WORKING_THREADS("proxy.max.working.threads", Runtime.getRuntime().availableProcessors() * 2 + "", int.class); private final String key; Loading sharding-core/src/main/java/io/shardingsphere/core/metadata/ShardingMetaData.java +17 −22 Original line number Diff line number Diff line Loading @@ -28,7 +28,6 @@ import io.shardingsphere.core.rule.ShardingRule; import io.shardingsphere.core.rule.TableRule; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.sql.Connection; Loading @@ -44,20 +43,19 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; /** * Abstract Sharding metadata. * Sharding metadata. * * @author panjuan * @author zhaojun */ @RequiredArgsConstructor @Getter @Setter @Slf4j public abstract class ShardingMetaData { private final ListeningExecutorService executorService; private Map<String, TableMetaData> tableMetaDataMap; private final Map<String, TableMetaData> tableMetaDataMap = new HashMap<>(); /** * Initialize sharding metadata. Loading @@ -65,20 +63,17 @@ public abstract class ShardingMetaData { * @param shardingRule sharding rule */ public void init(final ShardingRule shardingRule) { tableMetaDataMap = new HashMap<>(); try { Collection<TableRule> tableRules = getTableRules(shardingRule); for (TableRule each : tableRules) { for (TableRule each : getTableRules(shardingRule)) { refresh(each, shardingRule); } } catch (SQLException ex) { } catch (final SQLException ex) { throw new ShardingException(ex); } } private Collection<TableRule> getTableRules(final ShardingRule shardingRule) throws SQLException { Collection<TableRule> result = new LinkedList<>(); result.addAll(shardingRule.getTableRules()); Collection<TableRule> result = new LinkedList<>(shardingRule.getTableRules()); String defaultDataSourceName = shardingRule.getShardingDataSourceNames().getDefaultDataSourceName(); if (!Strings.isNullOrEmpty(defaultDataSourceName)) { Collection<String> defaultTableNames = getTableNamesFromDefaultDataSource(shardingRule.getMasterDataSourceName(defaultDataSourceName)); Loading @@ -101,26 +96,26 @@ public abstract class ShardingMetaData { /** * Refresh each tableMetaData by TableRule. * * @param each table rule * @param tableRule table rule * @param shardingRule sharding rule */ public void refresh(final TableRule each, final ShardingRule shardingRule) { refresh(each, shardingRule, Collections.<String, Connection>emptyMap()); public void refresh(final TableRule tableRule, final ShardingRule shardingRule) { refresh(tableRule, shardingRule, Collections.<String, Connection>emptyMap()); } /** * Refresh each tableMetaData by TableRule. * * @param each table rule * @param tableRule table rule * @param shardingRule sharding rule * @param connectionMap connection map passing from sharding connection */ public void refresh(final TableRule each, final ShardingRule shardingRule, final Map<String, Connection> connectionMap) { tableMetaDataMap.put(each.getLogicTable(), getFinalTableMetaData(each.getLogicTable(), each.getActualDataNodes(), shardingRule.getShardingDataSourceNames(), connectionMap)); public void refresh(final TableRule tableRule, final ShardingRule shardingRule, final Map<String, Connection> connectionMap) { tableMetaDataMap.put(tableRule.getLogicTable(), getFinalTableMetaData(tableRule.getLogicTable(), tableRule.getActualDataNodes(), shardingRule.getShardingDataSourceNames(), connectionMap)); } private TableMetaData getFinalTableMetaData(final String logicTableName, final List<DataNode> actualDataNodes, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) { private TableMetaData getFinalTableMetaData( final String logicTableName, final List<DataNode> actualDataNodes, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) { List<TableMetaData> actualTableMetaDataList = getAllActualTableMetaData(actualDataNodes, shardingDataSourceNames, connectionMap); for (int i = 0; i < actualTableMetaDataList.size(); i++) { if (actualTableMetaDataList.size() - 1 == i) { Loading @@ -133,17 +128,17 @@ public abstract class ShardingMetaData { return new TableMetaData(); } private List<TableMetaData> getAllActualTableMetaData(final List<DataNode> actualDataNodes, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) { private List<TableMetaData> getAllActualTableMetaData(final List<DataNode> actualDataNodes, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) { List<ListenableFuture<TableMetaData>> result = new ArrayList<>(); for (final DataNode each : actualDataNodes) { result.add(executorService.submit(new Callable<TableMetaData>() { @Override public TableMetaData call() throws Exception { return getTableMetaData(each, shardingDataSourceNames, connectionMap); } })); } try { return Futures.allAsList(result).get(); } catch (final InterruptedException | ExecutionException ex) { Loading @@ -168,7 +163,7 @@ public abstract class ShardingMetaData { } /** * Judge whether this databaseType is supported. * Judge whether this database type is supported. * * @return supported or not */ Loading sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/metadata/JDBCShardingMetaData.java +6 −9 Original line number Diff line number Diff line Loading @@ -54,19 +54,16 @@ public final class JDBCShardingMetaData extends ShardingMetaData { } @Override public TableMetaData getTableMetaData(final DataNode dataNode, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) throws SQLException { public TableMetaData getTableMetaData(final DataNode dataNode, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) throws SQLException { String dataSourceName = shardingDataSourceNames.getRawMasterDataSourceName(dataNode.getDataSourceName()); if (connectionMap.containsKey(dataSourceName)) { return ShardingMetaDataHandlerFactory.newInstance(dataNode.getTableName(), databaseType).getTableMetaData(connectionMap.get(dataSourceName)); } else { return ShardingMetaDataHandlerFactory.newInstance(dataSourceMap.get(dataSourceName), dataNode.getTableName(), databaseType).getTableMetaData(); } return ShardingMetaDataHandlerFactory.newInstance(dataSourceMap.get(dataSourceName), dataNode.getTableName(), databaseType).getTableMetaData(); } @Override public Collection<String> getTableNamesFromDefaultDataSource(final String defaultDataSourceName) throws SQLException { return ShardingMetaDataHandlerFactory.newInstance(dataSourceMap.get(defaultDataSourceName), "", databaseType).getTableNamesFromDefaultDataSource(); } } sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/ShardingProxyClient.java +2 −2 Original line number Diff line number Diff line Loading @@ -141,8 +141,8 @@ public final class ShardingProxyClient { for (int i = 0; i < MAX_CONNECTIONS; i++) { try { channels[i] = pool.acquire().get(CONNECT_TIMEOUT, TimeUnit.SECONDS); } catch (ExecutionException | TimeoutException e) { log.error(e.getMessage(), e); } catch (final ExecutionException | TimeoutException ex) { log.error(ex.getMessage(), ex); } } for (int i = 0; i < MAX_CONNECTIONS; i++) { Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/SQLPacketsBackendHandler.java +12 −12 Original line number Diff line number Diff line Loading @@ -76,8 +76,6 @@ public final class SQLPacketsBackendHandler implements BackendHandler { private final DatabaseType databaseType; private final boolean showSQL; private MergedResult mergedResult; private int currentSequenceId; Loading @@ -88,22 +86,24 @@ public final class SQLPacketsBackendHandler implements BackendHandler { private boolean hasMoreResultValueFlag; public SQLPacketsBackendHandler(final CommandPacketRebuilder rebuilder, final DatabaseType databaseType, final boolean showSQL) { private final RuleRegistry ruleRegistry; public SQLPacketsBackendHandler(final CommandPacketRebuilder rebuilder, final DatabaseType databaseType) { this.rebuilder = rebuilder; this.databaseType = databaseType; this.showSQL = showSQL; isMerged = false; hasMoreResultValueFlag = true; ruleRegistry = RuleRegistry.getInstance(); } @Override public CommandResponsePackets execute() { return RuleRegistry.getInstance().isMasterSlaveOnly() ? executeForMasterSlave() : executeForSharding(); return ruleRegistry.isMasterSlaveOnly() ? executeForMasterSlave() : executeForSharding(); } private CommandResponsePackets executeForMasterSlave() { SQLStatement sqlStatement = new SQLJudgeEngine(rebuilder.sql()).judge(); String dataSourceName = new MasterSlaveRouter(RuleRegistry.getInstance().getMasterSlaveRule()).route(sqlStatement.getType()).iterator().next(); String dataSourceName = new MasterSlaveRouter(ruleRegistry.getMasterSlaveRule()).route(sqlStatement.getType()).iterator().next(); synchronizedFuture = new SynchronizedFuture<>(1); MySQLResultCache.getInstance().putFuture(rebuilder.connectionId(), synchronizedFuture); CommandPacket commandPacket = rebuilder.rebuild(rebuilder.sequenceId(), rebuilder.connectionId(), rebuilder.sql()); Loading @@ -119,8 +119,8 @@ public final class SQLPacketsBackendHandler implements BackendHandler { } private CommandResponsePackets executeForSharding() { StatementRoutingEngine routingEngine = new StatementRoutingEngine(RuleRegistry.getInstance().getShardingRule(), RuleRegistry.getInstance().getShardingMetaData(), databaseType, showSQL, RuleRegistry.getInstance().getShardingDataSourceMetaData()); StatementRoutingEngine routingEngine = new StatementRoutingEngine( ruleRegistry.getShardingRule(), ruleRegistry.getShardingMetaData(), databaseType, ruleRegistry.isShowSQL(), ruleRegistry.getShardingDataSourceMetaData()); SQLRouteResult routeResult = routingEngine.route(rebuilder.sql()); if (routeResult.getExecutionUnits().isEmpty()) { return new CommandResponsePackets(new OKPacket(1)); Loading Loading @@ -179,8 +179,8 @@ public final class SQLPacketsBackendHandler implements BackendHandler { channel = pool.acquire().get(CONNECT_TIMEOUT, TimeUnit.SECONDS); MySQLResultCache.getInstance().putConnection(channel.id().asShortText(), connectionId); channel.writeAndFlush(commandPacket); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.error(e.getMessage(), e); } catch (final InterruptedException | ExecutionException | TimeoutException ex) { log.error(ex.getMessage(), ex); } finally { if (null != pool && null != channel) { pool.release(channel); Loading @@ -203,8 +203,8 @@ public final class SQLPacketsBackendHandler implements BackendHandler { private CommandResponsePackets mergeDQLorDAL(final SQLStatement sqlStatement, final List<CommandResponsePackets> packets, final List<QueryResult> queryResults) { try { mergedResult = MergeEngineFactory.newInstance(RuleRegistry.getInstance().getShardingRule(), queryResults, sqlStatement, RuleRegistry.getInstance().getShardingMetaData()).merge(); mergedResult = MergeEngineFactory.newInstance(ruleRegistry.getShardingRule(), queryResults, sqlStatement, ruleRegistry.getShardingMetaData()).merge(); isMerged = true; } catch (final SQLException ex) { return new CommandResponsePackets(new ErrPacket(1, ex)); Loading Loading
sharding-core/src/main/java/io/shardingsphere/core/constant/ShardingPropertiesConstant.java +5 −0 Original line number Diff line number Diff line Loading @@ -69,6 +69,11 @@ public enum ShardingPropertiesConstant { PROXY_TRANSACTION_MODE("proxy.transaction.mode", "NONE", String.class), /** * Thread pool size of connect database for Sharding-Proxy. * * <p>Cannot change dynamically, change this value should restart proxy.</p> */ PROXY_MAX_WORKING_THREADS("proxy.max.working.threads", Runtime.getRuntime().availableProcessors() * 2 + "", int.class); private final String key; Loading
sharding-core/src/main/java/io/shardingsphere/core/metadata/ShardingMetaData.java +17 −22 Original line number Diff line number Diff line Loading @@ -28,7 +28,6 @@ import io.shardingsphere.core.rule.ShardingRule; import io.shardingsphere.core.rule.TableRule; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.sql.Connection; Loading @@ -44,20 +43,19 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; /** * Abstract Sharding metadata. * Sharding metadata. * * @author panjuan * @author zhaojun */ @RequiredArgsConstructor @Getter @Setter @Slf4j public abstract class ShardingMetaData { private final ListeningExecutorService executorService; private Map<String, TableMetaData> tableMetaDataMap; private final Map<String, TableMetaData> tableMetaDataMap = new HashMap<>(); /** * Initialize sharding metadata. Loading @@ -65,20 +63,17 @@ public abstract class ShardingMetaData { * @param shardingRule sharding rule */ public void init(final ShardingRule shardingRule) { tableMetaDataMap = new HashMap<>(); try { Collection<TableRule> tableRules = getTableRules(shardingRule); for (TableRule each : tableRules) { for (TableRule each : getTableRules(shardingRule)) { refresh(each, shardingRule); } } catch (SQLException ex) { } catch (final SQLException ex) { throw new ShardingException(ex); } } private Collection<TableRule> getTableRules(final ShardingRule shardingRule) throws SQLException { Collection<TableRule> result = new LinkedList<>(); result.addAll(shardingRule.getTableRules()); Collection<TableRule> result = new LinkedList<>(shardingRule.getTableRules()); String defaultDataSourceName = shardingRule.getShardingDataSourceNames().getDefaultDataSourceName(); if (!Strings.isNullOrEmpty(defaultDataSourceName)) { Collection<String> defaultTableNames = getTableNamesFromDefaultDataSource(shardingRule.getMasterDataSourceName(defaultDataSourceName)); Loading @@ -101,26 +96,26 @@ public abstract class ShardingMetaData { /** * Refresh each tableMetaData by TableRule. * * @param each table rule * @param tableRule table rule * @param shardingRule sharding rule */ public void refresh(final TableRule each, final ShardingRule shardingRule) { refresh(each, shardingRule, Collections.<String, Connection>emptyMap()); public void refresh(final TableRule tableRule, final ShardingRule shardingRule) { refresh(tableRule, shardingRule, Collections.<String, Connection>emptyMap()); } /** * Refresh each tableMetaData by TableRule. * * @param each table rule * @param tableRule table rule * @param shardingRule sharding rule * @param connectionMap connection map passing from sharding connection */ public void refresh(final TableRule each, final ShardingRule shardingRule, final Map<String, Connection> connectionMap) { tableMetaDataMap.put(each.getLogicTable(), getFinalTableMetaData(each.getLogicTable(), each.getActualDataNodes(), shardingRule.getShardingDataSourceNames(), connectionMap)); public void refresh(final TableRule tableRule, final ShardingRule shardingRule, final Map<String, Connection> connectionMap) { tableMetaDataMap.put(tableRule.getLogicTable(), getFinalTableMetaData(tableRule.getLogicTable(), tableRule.getActualDataNodes(), shardingRule.getShardingDataSourceNames(), connectionMap)); } private TableMetaData getFinalTableMetaData(final String logicTableName, final List<DataNode> actualDataNodes, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) { private TableMetaData getFinalTableMetaData( final String logicTableName, final List<DataNode> actualDataNodes, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) { List<TableMetaData> actualTableMetaDataList = getAllActualTableMetaData(actualDataNodes, shardingDataSourceNames, connectionMap); for (int i = 0; i < actualTableMetaDataList.size(); i++) { if (actualTableMetaDataList.size() - 1 == i) { Loading @@ -133,17 +128,17 @@ public abstract class ShardingMetaData { return new TableMetaData(); } private List<TableMetaData> getAllActualTableMetaData(final List<DataNode> actualDataNodes, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) { private List<TableMetaData> getAllActualTableMetaData(final List<DataNode> actualDataNodes, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) { List<ListenableFuture<TableMetaData>> result = new ArrayList<>(); for (final DataNode each : actualDataNodes) { result.add(executorService.submit(new Callable<TableMetaData>() { @Override public TableMetaData call() throws Exception { return getTableMetaData(each, shardingDataSourceNames, connectionMap); } })); } try { return Futures.allAsList(result).get(); } catch (final InterruptedException | ExecutionException ex) { Loading @@ -168,7 +163,7 @@ public abstract class ShardingMetaData { } /** * Judge whether this databaseType is supported. * Judge whether this database type is supported. * * @return supported or not */ Loading
sharding-jdbc/src/main/java/io/shardingsphere/core/jdbc/metadata/JDBCShardingMetaData.java +6 −9 Original line number Diff line number Diff line Loading @@ -54,19 +54,16 @@ public final class JDBCShardingMetaData extends ShardingMetaData { } @Override public TableMetaData getTableMetaData(final DataNode dataNode, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) throws SQLException { public TableMetaData getTableMetaData(final DataNode dataNode, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) throws SQLException { String dataSourceName = shardingDataSourceNames.getRawMasterDataSourceName(dataNode.getDataSourceName()); if (connectionMap.containsKey(dataSourceName)) { return ShardingMetaDataHandlerFactory.newInstance(dataNode.getTableName(), databaseType).getTableMetaData(connectionMap.get(dataSourceName)); } else { return ShardingMetaDataHandlerFactory.newInstance(dataSourceMap.get(dataSourceName), dataNode.getTableName(), databaseType).getTableMetaData(); } return ShardingMetaDataHandlerFactory.newInstance(dataSourceMap.get(dataSourceName), dataNode.getTableName(), databaseType).getTableMetaData(); } @Override public Collection<String> getTableNamesFromDefaultDataSource(final String defaultDataSourceName) throws SQLException { return ShardingMetaDataHandlerFactory.newInstance(dataSourceMap.get(defaultDataSourceName), "", databaseType).getTableNamesFromDefaultDataSource(); } }
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/ShardingProxyClient.java +2 −2 Original line number Diff line number Diff line Loading @@ -141,8 +141,8 @@ public final class ShardingProxyClient { for (int i = 0; i < MAX_CONNECTIONS; i++) { try { channels[i] = pool.acquire().get(CONNECT_TIMEOUT, TimeUnit.SECONDS); } catch (ExecutionException | TimeoutException e) { log.error(e.getMessage(), e); } catch (final ExecutionException | TimeoutException ex) { log.error(ex.getMessage(), ex); } } for (int i = 0; i < MAX_CONNECTIONS; i++) { Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/SQLPacketsBackendHandler.java +12 −12 Original line number Diff line number Diff line Loading @@ -76,8 +76,6 @@ public final class SQLPacketsBackendHandler implements BackendHandler { private final DatabaseType databaseType; private final boolean showSQL; private MergedResult mergedResult; private int currentSequenceId; Loading @@ -88,22 +86,24 @@ public final class SQLPacketsBackendHandler implements BackendHandler { private boolean hasMoreResultValueFlag; public SQLPacketsBackendHandler(final CommandPacketRebuilder rebuilder, final DatabaseType databaseType, final boolean showSQL) { private final RuleRegistry ruleRegistry; public SQLPacketsBackendHandler(final CommandPacketRebuilder rebuilder, final DatabaseType databaseType) { this.rebuilder = rebuilder; this.databaseType = databaseType; this.showSQL = showSQL; isMerged = false; hasMoreResultValueFlag = true; ruleRegistry = RuleRegistry.getInstance(); } @Override public CommandResponsePackets execute() { return RuleRegistry.getInstance().isMasterSlaveOnly() ? executeForMasterSlave() : executeForSharding(); return ruleRegistry.isMasterSlaveOnly() ? executeForMasterSlave() : executeForSharding(); } private CommandResponsePackets executeForMasterSlave() { SQLStatement sqlStatement = new SQLJudgeEngine(rebuilder.sql()).judge(); String dataSourceName = new MasterSlaveRouter(RuleRegistry.getInstance().getMasterSlaveRule()).route(sqlStatement.getType()).iterator().next(); String dataSourceName = new MasterSlaveRouter(ruleRegistry.getMasterSlaveRule()).route(sqlStatement.getType()).iterator().next(); synchronizedFuture = new SynchronizedFuture<>(1); MySQLResultCache.getInstance().putFuture(rebuilder.connectionId(), synchronizedFuture); CommandPacket commandPacket = rebuilder.rebuild(rebuilder.sequenceId(), rebuilder.connectionId(), rebuilder.sql()); Loading @@ -119,8 +119,8 @@ public final class SQLPacketsBackendHandler implements BackendHandler { } private CommandResponsePackets executeForSharding() { StatementRoutingEngine routingEngine = new StatementRoutingEngine(RuleRegistry.getInstance().getShardingRule(), RuleRegistry.getInstance().getShardingMetaData(), databaseType, showSQL, RuleRegistry.getInstance().getShardingDataSourceMetaData()); StatementRoutingEngine routingEngine = new StatementRoutingEngine( ruleRegistry.getShardingRule(), ruleRegistry.getShardingMetaData(), databaseType, ruleRegistry.isShowSQL(), ruleRegistry.getShardingDataSourceMetaData()); SQLRouteResult routeResult = routingEngine.route(rebuilder.sql()); if (routeResult.getExecutionUnits().isEmpty()) { return new CommandResponsePackets(new OKPacket(1)); Loading Loading @@ -179,8 +179,8 @@ public final class SQLPacketsBackendHandler implements BackendHandler { channel = pool.acquire().get(CONNECT_TIMEOUT, TimeUnit.SECONDS); MySQLResultCache.getInstance().putConnection(channel.id().asShortText(), connectionId); channel.writeAndFlush(commandPacket); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.error(e.getMessage(), e); } catch (final InterruptedException | ExecutionException | TimeoutException ex) { log.error(ex.getMessage(), ex); } finally { if (null != pool && null != channel) { pool.release(channel); Loading @@ -203,8 +203,8 @@ public final class SQLPacketsBackendHandler implements BackendHandler { private CommandResponsePackets mergeDQLorDAL(final SQLStatement sqlStatement, final List<CommandResponsePackets> packets, final List<QueryResult> queryResults) { try { mergedResult = MergeEngineFactory.newInstance(RuleRegistry.getInstance().getShardingRule(), queryResults, sqlStatement, RuleRegistry.getInstance().getShardingMetaData()).merge(); mergedResult = MergeEngineFactory.newInstance(ruleRegistry.getShardingRule(), queryResults, sqlStatement, ruleRegistry.getShardingMetaData()).merge(); isMerged = true; } catch (final SQLException ex) { return new CommandResponsePackets(new ErrPacket(1, ex)); Loading