Commit b47922a4 authored by terrymanu's avatar terrymanu
Browse files

refactor merger

parent 1955249c
Loading
Loading
Loading
Loading
+2 −6
Original line number Diff line number Diff line
@@ -18,10 +18,10 @@
package com.dangdang.ddframe.rdb.sharding.merger;

import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.GroupByCouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.GroupByMemoryResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.LimitCouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.reducer.IteratorReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.reducer.StreamingOrderByReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.resultset.memory.MemorySortResultSet;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.SQLStatement;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
@@ -29,7 +29,6 @@ import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;

/**
@@ -73,10 +72,7 @@ public final class ResultSetFactory {
    }
    
    private static ResultSet buildMemoryResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
        ResultSet result = new MemorySortResultSet(resultSetMergeContext.getShardingResultSets().getResultSets(), resultSetMergeContext.getSqlStatement().getGroupByItems());
        result = new GroupByCouplingResultSet(result, resultSetMergeContext);
        result = new MemorySortResultSet(Collections.singletonList(result), resultSetMergeContext.getSqlStatement().getOrderByItems());
        return result;
        return new GroupByMemoryResultSet(resultSetMergeContext);
    }
    
    private static ResultSet buildStreamResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
+7 −4
Original line number Diff line number Diff line
@@ -40,6 +40,8 @@ public final class GroupByCouplingResultSet extends AbstractMemoryResultSet {
    
    private final List<OrderItem> groupByItems;
    
    private final List<OrderItem> orderByItems;
    
    private final List<AggregationSelectItem> aggregationColumns;
    
    private ResultSet resultSet;
@@ -49,6 +51,7 @@ public final class GroupByCouplingResultSet extends AbstractMemoryResultSet {
    public GroupByCouplingResultSet(final ResultSet resultSet, final ResultSetMergeContext resultSetMergeContext) throws SQLException {
        super(Collections.singletonList(resultSet));
        groupByItems = resultSetMergeContext.getSqlStatement().getGroupByItems();
        orderByItems = resultSetMergeContext.getSqlStatement().getOrderByItems();
        aggregationColumns = resultSetMergeContext.getSqlStatement().getAggregationSelectItems();
    }
    
@@ -63,10 +66,10 @@ public final class GroupByCouplingResultSet extends AbstractMemoryResultSet {
        if (!hasNext) {
            return Optional.absent();
        }
        GroupByResultSetRow result = new GroupByResultSetRow(resultSet, groupByItems, aggregationColumns);
        List<Object> groupByValues = result.getGroupValues();
        while (hasNext && (groupByItems.isEmpty() || groupByValues.equals(result.getGroupValues()))) {
            result.aggregate();
        GroupByResultSetRow result = new GroupByResultSetRow(resultSet, groupByItems, orderByItems, aggregationColumns);
        List<Comparable<?>> groupByValues = result.getGroupItemValues();
        while (hasNext && (groupByItems.isEmpty() || groupByValues.equals(result.getGroupItemValues()))) {
            result.aggregate(resultSet);
            hasNext = resultSet.next();
        }
        result.generateResult();
+88 −0
Original line number Diff line number Diff line
/*
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling;

import com.dangdang.ddframe.rdb.sharding.merger.ResultSetMergeContext;
import com.dangdang.ddframe.rdb.sharding.merger.resultset.memory.AbstractMemoryResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.resultset.memory.row.GroupByResultSetRow;
import com.dangdang.ddframe.rdb.sharding.merger.resultset.memory.row.ResultSetRow;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.OrderItem;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.selectitem.AggregationSelectItem;
import com.google.common.base.Optional;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * 分组的内存结果集.
 * 
 * @author zhangliang
 */
public final class GroupByMemoryResultSet extends AbstractMemoryResultSet {
    
    private final List<OrderItem> groupByItems;
    
    private final List<OrderItem> orderByItems;
    
    private final List<AggregationSelectItem> aggregationColumns;
    
    private final Map<List<Comparable<?>>, GroupByResultSetRow> dataMap;
    
    private Iterator<GroupByResultSetRow> data;
    
    public GroupByMemoryResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
        super(resultSetMergeContext.getShardingResultSets().getResultSets());
        groupByItems = resultSetMergeContext.getSqlStatement().getGroupByItems();
        orderByItems = resultSetMergeContext.getSqlStatement().getOrderByItems();
        aggregationColumns = resultSetMergeContext.getSqlStatement().getAggregationSelectItems();
        dataMap = new HashMap<>(1024);
    }
    
    @Override
    protected void initRows(final List<ResultSet> resultSets) throws SQLException {
        for (ResultSet each : resultSets) {
            while (each.next()) {
                GroupByResultSetRow groupByResultSetRow = new GroupByResultSetRow(each, groupByItems, orderByItems, aggregationColumns);
                if (!dataMap.containsKey(groupByResultSetRow.getGroupItemValues())) {
                    dataMap.put(groupByResultSetRow.getGroupItemValues(), groupByResultSetRow);
                }
                dataMap.get(groupByResultSetRow.getGroupItemValues()).aggregate(each);
            }
        }
        for (GroupByResultSetRow each : dataMap.values()) {
            each.generateResult();
        }
        List<GroupByResultSetRow> data = new ArrayList<>(dataMap.values());
        Collections.sort(data);
        this.data = data.iterator();
    }
    
    @Override
    protected Optional<? extends ResultSetRow> nextRow() throws SQLException {
        if (data.hasNext()) {
            return Optional.of(data.next());
        }
        return Optional.absent();
    }
}
+58 −9
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@

package com.dangdang.ddframe.rdb.sharding.merger.resultset.memory.row;

import com.dangdang.ddframe.rdb.sharding.constant.OrderType;
import com.dangdang.ddframe.rdb.sharding.merger.resultset.memory.row.aggregation.AggregationUnit;
import com.dangdang.ddframe.rdb.sharding.merger.resultset.memory.row.aggregation.AggregationUnitFactory;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.OrderItem;
@@ -39,18 +40,28 @@ import java.util.Map.Entry;
 * @author gaohongtao
 * @author zhangliang
 */
public final class GroupByResultSetRow extends AbstractResultSetRow {
public final class GroupByResultSetRow extends AbstractResultSetRow implements Comparable<GroupByResultSetRow> {
    
    private final ResultSet resultSet;
    
    private final List<OrderItem> groupByItems;
    
    private final List<OrderItem> orderByItems;
    
    private final List<Comparable<?>> groupItemValues;
    
    private final List<Comparable<?>> orderItemValues;
    
    private final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap;
    
    public GroupByResultSetRow(final ResultSet resultSet, final List<OrderItem> groupByItems, final List<AggregationSelectItem> aggregationSelectItems) throws SQLException {
    public GroupByResultSetRow(
            final ResultSet resultSet, final List<OrderItem> groupByItems, final List<OrderItem> orderByItems, final List<AggregationSelectItem> aggregationSelectItems) throws SQLException {
        super(resultSet);
        this.resultSet = resultSet;
        this.groupByItems = groupByItems;
        this.orderByItems = orderByItems;
        groupItemValues = getGroupItemValues();
        orderItemValues = getOrderItemValues();
        aggregationUnitMap = Maps.toMap(aggregationSelectItems, new Function<AggregationSelectItem, AggregationUnit>() {
            
            @Override
@@ -66,10 +77,22 @@ public final class GroupByResultSetRow extends AbstractResultSetRow {
     * @return 分组值集合
     * @throws SQLException SQL异常
     */
    public List<Object> getGroupValues() throws SQLException {
        List<Object> result = new ArrayList<>(groupByItems.size());
    public List<Comparable<?>> getGroupItemValues() throws SQLException {
        List<Comparable<?>> result = new ArrayList<>(groupByItems.size());
        for (OrderItem each : groupByItems) {
            result.add(resultSet.getObject(each.getIndex()));
            Object value = resultSet.getObject(each.getIndex());
            Preconditions.checkState(value instanceof Comparable, "Group by value must implements Comparable");
            result.add((Comparable<?>) value);
        }
        return result;
    }
    
    private List<Comparable<?>> getOrderItemValues() {
        List<Comparable<?>> result = new ArrayList<>(orderByItems.size());
        for (OrderItem each : orderByItems) {
            Object value = getCell(each.getIndex());
            Preconditions.checkState(value instanceof Comparable, "Order by value must implements Comparable");
            result.add((Comparable<?>) value);
        }
        return result;
    }
@@ -77,16 +100,17 @@ public final class GroupByResultSetRow extends AbstractResultSetRow {
    /**
     * 处理聚合函数结果集.
     * 
     * @param resultSet 结果集
     * @throws SQLException SQL异常
     */
    public void aggregate() throws SQLException {
    public void aggregate(final ResultSet resultSet) throws SQLException {
        for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
            entry.getValue().merge(
                    getAggregationValues(entry.getKey().getDerivedAggregationSelectItems().isEmpty() ? Collections.singletonList(entry.getKey()) : entry.getKey().getDerivedAggregationSelectItems()));
            entry.getValue().merge(getAggregationValues(resultSet, 
                    entry.getKey().getDerivedAggregationSelectItems().isEmpty() ? Collections.singletonList(entry.getKey()) : entry.getKey().getDerivedAggregationSelectItems()));
        }
    }
    
    private List<Comparable<?>> getAggregationValues(final List<AggregationSelectItem> aggregationSelectItems) throws SQLException {
    private List<Comparable<?>> getAggregationValues(final ResultSet resultSet, final List<AggregationSelectItem> aggregationSelectItems) throws SQLException {
        List<Comparable<?>> result = new ArrayList<>(aggregationSelectItems.size());
        for (AggregationSelectItem each : aggregationSelectItems) {
            Object value = resultSet.getObject(each.getIndex());
@@ -104,4 +128,29 @@ public final class GroupByResultSetRow extends AbstractResultSetRow {
            setCell(each.getIndex(), aggregationUnitMap.get(each).getResult());
        }
    }
    
    @Override
    public int compareTo(final GroupByResultSetRow o) {
        if (!orderByItems.isEmpty()) {
            for (int i = 0; i < orderByItems.size(); i++) {
                int result = compareTo(orderItemValues.get(i), o.orderItemValues.get(i), orderByItems.get(i).getType());
                if (0 != result) {
                    return result;
                }
            }
        } else {
            for (int i = 0; i < groupByItems.size(); i++) {
                int result = compareTo(groupItemValues.get(i), o.groupItemValues.get(i), groupByItems.get(i).getType());
                if (0 != result) {
                    return result;
                }
            }
        }
        return 0;
    }
    
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static int compareTo(final Comparable thisValue, final Comparable otherValue, final OrderType type) {
        return OrderType.ASC == type ? thisValue.compareTo(otherValue) : -thisValue.compareTo(otherValue);
    }
}
+7 −7
Original line number Diff line number Diff line
@@ -34,19 +34,19 @@ import java.util.List;
 */
public final class OrderByResultSetRow extends AbstractResultSetRow implements Comparable<OrderByResultSetRow> {
    
    private final List<OrderItem> orderItems;
    private final List<OrderItem> orderByItems;
    
    private final List<Comparable<?>> orderValues;
    
    public OrderByResultSetRow(final ResultSet resultSet, final List<OrderItem> orderItems) throws SQLException {
    public OrderByResultSetRow(final ResultSet resultSet, final List<OrderItem> orderByItems) throws SQLException {
        super(resultSet);
        this.orderItems = orderItems;
        this.orderByItems = orderByItems;
        orderValues = getOrderValues();
    }
    
    private List<Comparable<?>> getOrderValues() {
        List<Comparable<?>> result = new ArrayList<>(orderItems.size());
        for (OrderItem each : orderItems) {
        List<Comparable<?>> result = new ArrayList<>(orderByItems.size());
        for (OrderItem each : orderByItems) {
            Object value = getCell(each.getIndex());
            Preconditions.checkState(value instanceof Comparable, "Order by value must implements Comparable");
            result.add((Comparable<?>) value);
@@ -56,8 +56,8 @@ public final class OrderByResultSetRow extends AbstractResultSetRow implements C
    
    @Override
    public int compareTo(final OrderByResultSetRow o) {
        for (int i = 0; i < orderItems.size(); i++) {
            OrderItem thisOrderBy = orderItems.get(i);
        for (int i = 0; i < orderByItems.size(); i++) {
            OrderItem thisOrderBy = orderByItems.get(i);
            int result = compareTo(orderValues.get(i), o.orderValues.get(i), thisOrderBy.getType());
            if (0 != result) {
                return result;
Loading