Commit c6c78cb3 authored by haocao's avatar haocao
Browse files

Merge remote-tracking branch 'origin/1.5.0.M3-SNAPSHOT' into 1.5.0.M3-SNAPSHOT

parents 04f0f2ce b424971d
Loading
Loading
Loading
Loading
+0 −17
Original line number Diff line number Diff line
@@ -18,19 +18,15 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;

import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationResultSet;
import com.dangdang.ddframe.rdb.sharding.util.SQLUtil;
import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**
 * 代理结果集适配器.
@@ -43,24 +39,11 @@ public abstract class AbstractResultSetAdapter extends AbstractUnsupportedOperat
    @Getter
    private final List<ResultSet> resultSets;

    @Getter
    private final Map<String, Integer> columnLabelIndexMap;

    private boolean closed;
    
    public AbstractResultSetAdapter(final List<ResultSet> resultSets) throws SQLException {
        Preconditions.checkArgument(!resultSets.isEmpty());
        this.resultSets = resultSets;
        columnLabelIndexMap = generateColumnLabelIndexMap();
    }
    
    private Map<String, Integer> generateColumnLabelIndexMap() throws SQLException {
        ResultSetMetaData resultSetMetaData = resultSets.get(0).getMetaData();
        Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
            result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i);
        }
        return result;
    }
    
    @Override
+6 −8
Original line number Diff line number Diff line
@@ -19,10 +19,9 @@ package com.dangdang.ddframe.rdb.sharding.merger;

import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.GroupByCouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.LimitCouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.MemoryOrderByCouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.reducer.IteratorReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.reducer.MemoryOrderByReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.reducer.StreamingOrderByReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.resultset.memory.MemorySortResultSet;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.SQLStatement;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
@@ -30,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;

/**
@@ -64,9 +64,8 @@ public final class ResultSetFactory {
    }
    
    private static ResultSet buildReducer(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
        if (resultSetMergeContext.isNeedMemorySortForGroupBy()) {
            resultSetMergeContext.setGroupByKeysToCurrentOrderByKeys();
            return new MemoryOrderByReducerResultSet(resultSetMergeContext);
        if (resultSetMergeContext.getSqlStatement().isGroupByAndOrderByDifferent()) {
            return new MemorySortResultSet(resultSetMergeContext.getShardingResultSets().getResultSets(), resultSetMergeContext.getSqlStatement().getGroupByList());
        }
        if (!resultSetMergeContext.getSqlStatement().getGroupByList().isEmpty() || !resultSetMergeContext.getSqlStatement().getOrderByList().isEmpty()) {
            return new StreamingOrderByReducerResultSet(resultSetMergeContext);
@@ -79,9 +78,8 @@ public final class ResultSetFactory {
        if (!resultSetMergeContext.getSqlStatement().getGroupByList().isEmpty() || !resultSetMergeContext.getSqlStatement().getAggregationSelectItems().isEmpty()) {
            result = new GroupByCouplingResultSet(result, resultSetMergeContext);
        }
        if (resultSetMergeContext.isNeedMemorySortForOrderBy()) {
            resultSetMergeContext.setOrderByKeysToCurrentOrderByKeys();
            result = new MemoryOrderByCouplingResultSet(result, resultSetMergeContext);
        if (resultSetMergeContext.getSqlStatement().isGroupByAndOrderByDifferent()) {
            result = new MemorySortResultSet(Collections.singletonList(result), resultSetMergeContext.getSqlStatement().getOrderByList());
        }
        if (null != resultSetMergeContext.getSqlStatement().getLimit()) {
            result = new LimitCouplingResultSet(result, resultSetMergeContext.getSqlStatement());
+4 −41
Original line number Diff line number Diff line
@@ -17,14 +17,14 @@

package com.dangdang.ddframe.rdb.sharding.merger;

import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.util.ResultSetUtil;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.OrderItem;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.selectitem.AggregationSelectItem;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.SQLStatement;
import com.google.common.base.Preconditions;
import lombok.Getter;

import java.util.LinkedList;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;

@@ -40,13 +40,10 @@ public final class ResultSetMergeContext {
    
    private final SQLStatement sqlStatement;
    
    private final List<OrderItem> currentOrderByKeys;
    
    public ResultSetMergeContext(final ShardingResultSets shardingResultSets, final SQLStatement sqlStatement) {
    public ResultSetMergeContext(final ShardingResultSets shardingResultSets, final SQLStatement sqlStatement) throws SQLException {
        this.shardingResultSets = shardingResultSets;
        this.sqlStatement = sqlStatement;
        currentOrderByKeys = new LinkedList<>(sqlStatement.getOrderByList());
        Map<String, Integer> columnLabelIndexMap = ((AbstractResultSetAdapter) shardingResultSets.getResultSets().get(0)).getColumnLabelIndexMap();
        Map<String, Integer> columnLabelIndexMap = ResultSetUtil.getColumnLabelIndexMap(shardingResultSets.getResultSets().get(0));
        setIndexForAggregationItem(columnLabelIndexMap);
        setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getOrderByList());
        setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getGroupByList());
@@ -74,38 +71,4 @@ public final class ResultSetMergeContext {
            }
        }
    }
    
    /**
     * 判断分组归并是否需要内存排序.
     *
     * @return 分组归并是否需要内存排序
     */
    public boolean isNeedMemorySortForGroupBy() {
        return !sqlStatement.getGroupByList().isEmpty() && !sqlStatement.getOrderByList().equals(sqlStatement.getGroupByList());
    }
    
    /**
     * 将分组顺序设置为排序序列.
     */
    public void setGroupByKeysToCurrentOrderByKeys() {
        currentOrderByKeys.clear();
        currentOrderByKeys.addAll(sqlStatement.getGroupByList());
    }
    
    /**
     * 判断排序归并是否需要内存排序.
     *
     * @return 排序归并是否需要内存排序
     */
    public boolean isNeedMemorySortForOrderBy() {
        return !sqlStatement.getOrderByList().isEmpty() && !currentOrderByKeys.equals(sqlStatement.getOrderByList());
    }
    
    /**
     * 将排序顺序设置为排序序列.
     */
    public void setOrderByKeysToCurrentOrderByKeys() {
        currentOrderByKeys.clear();
        currentOrderByKeys.addAll(sqlStatement.getOrderByList());
    }
}
+0 −36
Original line number Diff line number Diff line
/*
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package com.dangdang.ddframe.rdb.sharding.merger.pipeline.reducer;

import com.dangdang.ddframe.rdb.sharding.merger.ResultSetMergeContext;
import com.dangdang.ddframe.rdb.sharding.merger.resultset.memory.AbstractMemoryOrderByResultSet;

import java.sql.SQLException;

/**
 * 根据排序列内存排序的聚集结果集.
 *
 * @author gaohongtao
 * @author zhangliang
 */
public final class MemoryOrderByReducerResultSet extends AbstractMemoryOrderByResultSet {
    
    public MemoryOrderByReducerResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
        super(resultSetMergeContext.getShardingResultSets().getResultSets(), resultSetMergeContext.getCurrentOrderByKeys());
    }
}
+1 −1
Original line number Diff line number Diff line
@@ -46,7 +46,7 @@ public final class StreamingOrderByReducerResultSet extends AbstractDelegateResu
    public StreamingOrderByReducerResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
        super(resultSetMergeContext.getShardingResultSets().getResultSets());
        priorityQueue = new PriorityQueue<>(getResultSets().size());
        orderItems = resultSetMergeContext.getCurrentOrderByKeys();
        orderItems = resultSetMergeContext.getSqlStatement().getOrderByList();
    }
    
    @Override
Loading