Loading sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetFactory.java +4 −4 Original line number Diff line number Diff line Loading @@ -73,20 +73,20 @@ public final class ResultSetFactory { } private static ResultSet buildMemoryResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException { ResultSet result = new MemorySortResultSet(resultSetMergeContext.getShardingResultSets().getResultSets(), resultSetMergeContext.getSqlStatement().getGroupByList()); ResultSet result = new MemorySortResultSet(resultSetMergeContext.getShardingResultSets().getResultSets(), resultSetMergeContext.getSqlStatement().getGroupByItems()); result = new GroupByCouplingResultSet(result, resultSetMergeContext); result = new MemorySortResultSet(Collections.singletonList(result), resultSetMergeContext.getSqlStatement().getOrderByList()); result = new MemorySortResultSet(Collections.singletonList(result), resultSetMergeContext.getSqlStatement().getOrderByItems()); return result; } private static ResultSet buildStreamResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException { ResultSet result; if (resultSetMergeContext.getSqlStatement().getGroupByList().isEmpty() && resultSetMergeContext.getSqlStatement().getOrderByList().isEmpty()) { if (resultSetMergeContext.getSqlStatement().getGroupByItems().isEmpty() && resultSetMergeContext.getSqlStatement().getOrderByItems().isEmpty()) { result = new IteratorReducerResultSet(resultSetMergeContext); } else { result = new StreamingOrderByReducerResultSet(resultSetMergeContext); } if (!resultSetMergeContext.getSqlStatement().getGroupByList().isEmpty() || !resultSetMergeContext.getSqlStatement().getAggregationSelectItems().isEmpty()) { if (!resultSetMergeContext.getSqlStatement().getGroupByItems().isEmpty() || !resultSetMergeContext.getSqlStatement().getAggregationSelectItems().isEmpty()) { result = new GroupByCouplingResultSet(result, resultSetMergeContext); } return result; Loading sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetMergeContext.java +2 −2 Original line number Diff line number Diff line Loading @@ -45,8 +45,8 @@ public final class ResultSetMergeContext { this.sqlStatement = sqlStatement; Map<String, Integer> columnLabelIndexMap = ResultSetUtil.getColumnLabelIndexMap(shardingResultSets.getResultSets().get(0)); setIndexForAggregationItem(columnLabelIndexMap); setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getOrderByList()); setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getGroupByList()); setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getOrderByItems()); setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getGroupByItems()); } private void setIndexForAggregationItem(final Map<String, Integer> columnLabelIndexMap) { Loading sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/pipeline/coupling/GroupByCouplingResultSet.java +4 −4 Original line number Diff line number Diff line Loading @@ -38,7 +38,7 @@ import java.util.List; */ public final class GroupByCouplingResultSet extends AbstractMemoryResultSet { private final List<OrderItem> groupByList; private final List<OrderItem> groupByItems; private final List<AggregationSelectItem> aggregationColumns; Loading @@ -48,7 +48,7 @@ public final class GroupByCouplingResultSet extends AbstractMemoryResultSet { public GroupByCouplingResultSet(final ResultSet resultSet, final ResultSetMergeContext resultSetMergeContext) throws SQLException { super(Collections.singletonList(resultSet)); groupByList = resultSetMergeContext.getSqlStatement().getGroupByList(); groupByItems = resultSetMergeContext.getSqlStatement().getGroupByItems(); aggregationColumns = resultSetMergeContext.getSqlStatement().getAggregationSelectItems(); } Loading @@ -63,9 +63,9 @@ public final class GroupByCouplingResultSet extends AbstractMemoryResultSet { if (!hasNext) { return Optional.absent(); } GroupByResultSetRow result = new GroupByResultSetRow(resultSet, groupByList, aggregationColumns); GroupByResultSetRow result = new GroupByResultSetRow(resultSet, groupByItems, aggregationColumns); List<Object> groupByValues = result.getGroupValues(); while (hasNext && (groupByList.isEmpty() || groupByValues.equals(result.getGroupValues()))) { while (hasNext && (groupByItems.isEmpty() || groupByValues.equals(result.getGroupValues()))) { result.aggregate(); hasNext = resultSet.next(); } Loading sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/pipeline/reducer/StreamingOrderByReducerResultSet.java +1 −1 Original line number Diff line number Diff line Loading @@ -46,7 +46,7 @@ public final class StreamingOrderByReducerResultSet extends AbstractDelegateResu public StreamingOrderByReducerResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException { super(resultSetMergeContext.getShardingResultSets().getResultSets()); priorityQueue = new PriorityQueue<>(getResultSets().size()); orderItems = resultSetMergeContext.getSqlStatement().getOrderByList(); orderItems = resultSetMergeContext.getSqlStatement().getOrderByItems(); } @Override Loading sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/resultset/memory/MemorySortResultSet.java +4 −4 Original line number Diff line number Diff line Loading @@ -37,13 +37,13 @@ import java.util.List; */ public final class MemorySortResultSet extends AbstractMemoryResultSet { private final List<OrderItem> orderByList; private final List<OrderItem> orderByItems; private Iterator<OrderByResultSetRow> orderByResultSetRows; public MemorySortResultSet(final List<ResultSet> resultSets, final List<OrderItem> orderByList) throws SQLException { public MemorySortResultSet(final List<ResultSet> resultSets, final List<OrderItem> orderByItems) throws SQLException { super(resultSets); this.orderByList = orderByList; this.orderByItems = orderByItems; } @Override Loading @@ -51,7 +51,7 @@ public final class MemorySortResultSet extends AbstractMemoryResultSet { List<OrderByResultSetRow> orderByResultSetRows = new LinkedList<>(); for (ResultSet each : resultSets) { while (each.next()) { orderByResultSetRows.add(new OrderByResultSetRow(each, orderByList)); orderByResultSetRows.add(new OrderByResultSetRow(each, orderByItems)); } } Collections.sort(orderByResultSetRows); Loading Loading
sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetFactory.java +4 −4 Original line number Diff line number Diff line Loading @@ -73,20 +73,20 @@ public final class ResultSetFactory { } private static ResultSet buildMemoryResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException { ResultSet result = new MemorySortResultSet(resultSetMergeContext.getShardingResultSets().getResultSets(), resultSetMergeContext.getSqlStatement().getGroupByList()); ResultSet result = new MemorySortResultSet(resultSetMergeContext.getShardingResultSets().getResultSets(), resultSetMergeContext.getSqlStatement().getGroupByItems()); result = new GroupByCouplingResultSet(result, resultSetMergeContext); result = new MemorySortResultSet(Collections.singletonList(result), resultSetMergeContext.getSqlStatement().getOrderByList()); result = new MemorySortResultSet(Collections.singletonList(result), resultSetMergeContext.getSqlStatement().getOrderByItems()); return result; } private static ResultSet buildStreamResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException { ResultSet result; if (resultSetMergeContext.getSqlStatement().getGroupByList().isEmpty() && resultSetMergeContext.getSqlStatement().getOrderByList().isEmpty()) { if (resultSetMergeContext.getSqlStatement().getGroupByItems().isEmpty() && resultSetMergeContext.getSqlStatement().getOrderByItems().isEmpty()) { result = new IteratorReducerResultSet(resultSetMergeContext); } else { result = new StreamingOrderByReducerResultSet(resultSetMergeContext); } if (!resultSetMergeContext.getSqlStatement().getGroupByList().isEmpty() || !resultSetMergeContext.getSqlStatement().getAggregationSelectItems().isEmpty()) { if (!resultSetMergeContext.getSqlStatement().getGroupByItems().isEmpty() || !resultSetMergeContext.getSqlStatement().getAggregationSelectItems().isEmpty()) { result = new GroupByCouplingResultSet(result, resultSetMergeContext); } return result; Loading
sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetMergeContext.java +2 −2 Original line number Diff line number Diff line Loading @@ -45,8 +45,8 @@ public final class ResultSetMergeContext { this.sqlStatement = sqlStatement; Map<String, Integer> columnLabelIndexMap = ResultSetUtil.getColumnLabelIndexMap(shardingResultSets.getResultSets().get(0)); setIndexForAggregationItem(columnLabelIndexMap); setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getOrderByList()); setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getGroupByList()); setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getOrderByItems()); setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getGroupByItems()); } private void setIndexForAggregationItem(final Map<String, Integer> columnLabelIndexMap) { Loading
sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/pipeline/coupling/GroupByCouplingResultSet.java +4 −4 Original line number Diff line number Diff line Loading @@ -38,7 +38,7 @@ import java.util.List; */ public final class GroupByCouplingResultSet extends AbstractMemoryResultSet { private final List<OrderItem> groupByList; private final List<OrderItem> groupByItems; private final List<AggregationSelectItem> aggregationColumns; Loading @@ -48,7 +48,7 @@ public final class GroupByCouplingResultSet extends AbstractMemoryResultSet { public GroupByCouplingResultSet(final ResultSet resultSet, final ResultSetMergeContext resultSetMergeContext) throws SQLException { super(Collections.singletonList(resultSet)); groupByList = resultSetMergeContext.getSqlStatement().getGroupByList(); groupByItems = resultSetMergeContext.getSqlStatement().getGroupByItems(); aggregationColumns = resultSetMergeContext.getSqlStatement().getAggregationSelectItems(); } Loading @@ -63,9 +63,9 @@ public final class GroupByCouplingResultSet extends AbstractMemoryResultSet { if (!hasNext) { return Optional.absent(); } GroupByResultSetRow result = new GroupByResultSetRow(resultSet, groupByList, aggregationColumns); GroupByResultSetRow result = new GroupByResultSetRow(resultSet, groupByItems, aggregationColumns); List<Object> groupByValues = result.getGroupValues(); while (hasNext && (groupByList.isEmpty() || groupByValues.equals(result.getGroupValues()))) { while (hasNext && (groupByItems.isEmpty() || groupByValues.equals(result.getGroupValues()))) { result.aggregate(); hasNext = resultSet.next(); } Loading
sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/pipeline/reducer/StreamingOrderByReducerResultSet.java +1 −1 Original line number Diff line number Diff line Loading @@ -46,7 +46,7 @@ public final class StreamingOrderByReducerResultSet extends AbstractDelegateResu public StreamingOrderByReducerResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException { super(resultSetMergeContext.getShardingResultSets().getResultSets()); priorityQueue = new PriorityQueue<>(getResultSets().size()); orderItems = resultSetMergeContext.getSqlStatement().getOrderByList(); orderItems = resultSetMergeContext.getSqlStatement().getOrderByItems(); } @Override Loading
sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/resultset/memory/MemorySortResultSet.java +4 −4 Original line number Diff line number Diff line Loading @@ -37,13 +37,13 @@ import java.util.List; */ public final class MemorySortResultSet extends AbstractMemoryResultSet { private final List<OrderItem> orderByList; private final List<OrderItem> orderByItems; private Iterator<OrderByResultSetRow> orderByResultSetRows; public MemorySortResultSet(final List<ResultSet> resultSets, final List<OrderItem> orderByList) throws SQLException { public MemorySortResultSet(final List<ResultSet> resultSets, final List<OrderItem> orderByItems) throws SQLException { super(resultSets); this.orderByList = orderByList; this.orderByItems = orderByItems; } @Override Loading @@ -51,7 +51,7 @@ public final class MemorySortResultSet extends AbstractMemoryResultSet { List<OrderByResultSetRow> orderByResultSetRows = new LinkedList<>(); for (ResultSet each : resultSets) { while (each.next()) { orderByResultSetRows.add(new OrderByResultSetRow(each, orderByList)); orderByResultSetRows.add(new OrderByResultSetRow(each, orderByItems)); } } Collections.sort(orderByResultSetRows); Loading