Commit 2d924265 authored by terrymanu's avatar terrymanu
Browse files

refactor merger 3nd version

parent c90f7797
Loading
Loading
Loading
Loading
+11 −11
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@

package com.dangdang.ddframe.rdb.sharding.merger;

import com.dangdang.ddframe.rdb.sharding.merger.limit.LimitResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.decorator.LimitResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.stream.GroupByStreamResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.memory.GroupByMemoryResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.stream.IteratorStreamResultSet;
@@ -59,16 +59,8 @@ public final class ResultSetFactory {
    
    private static ResultSet buildResultSet(final ShardingResultSets shardingResultSets, final SQLStatement sqlStatement) throws SQLException {
        ResultSetMergeContext resultSetMergeContext = new ResultSetMergeContext(shardingResultSets, sqlStatement);
        ResultSet result;
        if (resultSetMergeContext.getSqlStatement().isGroupByAndOrderByDifferent()) {
            result = buildMemoryResultSet(resultSetMergeContext);
        } else {
            result = buildStreamResultSet(resultSetMergeContext);
        }
        if (null != resultSetMergeContext.getSqlStatement().getLimit()) {
            result = new LimitResultSet(result, resultSetMergeContext.getSqlStatement());
        }
        return result;
        ResultSet resultSet = resultSetMergeContext.getSqlStatement().isGroupByAndOrderByDifferent() ? buildMemoryResultSet(resultSetMergeContext) : buildStreamResultSet(resultSetMergeContext);
        return decorateResultSet(resultSetMergeContext, resultSet);
    }
    
    private static ResultSet buildMemoryResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
@@ -87,4 +79,12 @@ public final class ResultSetFactory {
        }
        return result;
    }
    
    private static ResultSet decorateResultSet(final ResultSetMergeContext resultSetMergeContext, final ResultSet resultSet) throws SQLException {
        ResultSet result = resultSet;
        if (null != resultSetMergeContext.getSqlStatement().getLimit()) {
            result = new LimitResultSet(result, resultSetMergeContext.getSqlStatement());
        }
        return result;
    }
}
+1 −1
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@
 * </p>
 */

package com.dangdang.ddframe.rdb.sharding.merger.limit;
package com.dangdang.ddframe.rdb.sharding.merger.decorator;

import com.dangdang.ddframe.rdb.sharding.merger.stream.AbstractStreamResultSet;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.limit.Limit;
+0 −68
Original line number Diff line number Diff line
/*
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package com.dangdang.ddframe.rdb.sharding.merger.memory;

import com.dangdang.ddframe.rdb.sharding.merger.memory.row.OrderByResultSetRow;
import com.dangdang.ddframe.rdb.sharding.merger.memory.row.ResultSetRow;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.OrderItem;
import com.google.common.base.Optional;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

/**
 * 基于内存排序的结果集抽象类.
 * 
 * @author gaohongtao
 * @author zhangliang
 */
public final class OrderByMemoryResultSet extends AbstractMemoryResultSet {
    
    private final List<OrderItem> orderByItems;
    
    private Iterator<OrderByResultSetRow> orderByResultSetRows;
    
    public OrderByMemoryResultSet(final List<ResultSet> resultSets, final List<OrderItem> orderByItems) throws SQLException {
        super(resultSets);
        this.orderByItems = orderByItems;
    }
    
    @Override
    protected void initRows(final List<ResultSet> resultSets) throws SQLException {
        List<OrderByResultSetRow> orderByResultSetRows = new LinkedList<>();
        for (ResultSet each : resultSets) {
            while (each.next()) {
                orderByResultSetRows.add(new OrderByResultSetRow(each, orderByItems));
            }
        }
        Collections.sort(orderByResultSetRows);
        this.orderByResultSetRows = orderByResultSetRows.iterator();
    }
    
    @Override
    protected Optional<? extends ResultSetRow> nextRow() throws SQLException {
        if (orderByResultSetRows.hasNext()) {
            return Optional.of(orderByResultSetRows.next());
        }
        return Optional.absent();
    }
}
+7 −9
Original line number Diff line number Diff line
@@ -17,19 +17,18 @@

package com.dangdang.ddframe.rdb.sharding.merger;

import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.AggregationResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.NullableAggregationResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.OrderByResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.MemoryResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.row.GroupByResultSetRowTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.row.OrderByResultSetRowTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.row.ResultSetRowTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.row.aggregation.AccumulationAggregationUnitTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.row.aggregation.AggregationUnitFactoryTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.row.aggregation.AverageAggregationUnitTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.row.aggregation.ComparableAggregationUnitTest;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.AggregationResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.NullableAggregationResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.coupling.OrderByResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.pipeline.reducer.IteratorResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.MemoryOrderByResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.MemoryResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.row.GroupByResultSetRowTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.row.OrderByResultSetRowTest;
import com.dangdang.ddframe.rdb.sharding.merger.memory.row.ResultSetRowTest;
import com.dangdang.ddframe.rdb.sharding.merger.util.ResultSetUtilTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -40,7 +39,6 @@ import org.junit.runners.Suite;
    ResultSetMergeContextTest.class, 
    IteratorResultSetTest.class, 
    OrderByResultSetTest.class, 
    MemoryOrderByResultSetTest.class, 
    AggregationResultSetTest.class, 
    NullableAggregationResultSetTest.class, 
    ResultSetRowTest.class, 
+0 −141
Original line number Diff line number Diff line
/*
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package com.dangdang.ddframe.rdb.sharding.merger.memory;

import com.dangdang.ddframe.rdb.sharding.constant.OrderType;
import com.dangdang.ddframe.rdb.sharding.merger.fixture.MockResultSet;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.OrderItem;
import com.google.common.base.Optional;
import org.junit.Test;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

public class MemoryOrderByResultSetTest {
    
    @Test
    public void assertSort() throws SQLException {
        OrderByMemoryResultSet rs = new OrderByMemoryResultSet(
                Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)), Collections.singletonList(new OrderItem(1, OrderType.ASC)));
        List<Integer> actualList = new ArrayList<>();
        while (rs.next()) {
            actualList.add(rs.getInt(1));
        }
        assertThat(actualList, is(Arrays.asList(1, 2, 3, 4, 5, 6, 6, 6, 8)));
        rs.close();
        assertTrue(rs.isClosed());
        rs = new OrderByMemoryResultSet(
                Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)), Collections.singletonList(new OrderItem(1, OrderType.DESC)));
        actualList.clear();
        while (rs.next()) {
            actualList.add(rs.getInt("nAmE"));
        }
        assertThat(actualList, is(Arrays.asList(8, 6, 6, 6, 5, 4, 3, 2, 1)));
    }
    
    @Test
    public void assertSortMultiColumn() throws SQLException {
        Map<String, Object> rs1 = new LinkedHashMap<>();
        Calendar cal = Calendar.getInstance();
        cal.set(2016, Calendar.JANUARY, 11);
        
        rs1.put("name", "name");
        rs1.put("time", cal.getTime());
        rs1.put("id", 11);
        
        Map<String, Object> rs2 = new LinkedHashMap<>();
        cal.set(2016, Calendar.JANUARY, 9);
        rs2.put("name", "dbc");
        rs2.put("time", cal.getTime());
        rs2.put("id", 12);
        
        Map<String, Object> rs3 = new LinkedHashMap<>();
        cal.set(2016, Calendar.JANUARY, 8);
        rs3.put("name", "dbc");
        rs3.put("time", cal.getTime());
        rs3.put("id", 13);
        OrderItem orderItem1 = new OrderItem("name", OrderType.ASC, Optional.<String>absent());
        orderItem1.setIndex(1);
        OrderItem orderItem2 = new OrderItem("time", OrderType.DESC, Optional.<String>absent());
        orderItem2.setIndex(2);
        OrderByMemoryResultSet rs = new OrderByMemoryResultSet(Collections.<ResultSet>singletonList(new MockResultSet<>(Arrays.asList(rs1, rs2, rs3))), Arrays.asList(orderItem1, orderItem2));
        List<Map<String, Object>> actualList = new ArrayList<>();
        while (rs.next()) {
            Map<String, Object> map = new TreeMap<>();
            map.put("name", rs.getObject("name"));
            map.put("time", rs.getObject("time"));
            map.put("id", rs.getObject("id"));
            actualList.add(map);
        }
        assertThat(actualList, is(Arrays.asList(rs2, rs3, rs1)));
    }
    
    @Test
    public void assertFindColumnSuccess() throws SQLException {
        OrderByMemoryResultSet rs = new OrderByMemoryResultSet(
                Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)), Collections.singletonList(new OrderItem(1, OrderType.ASC)));
        assertThat(rs.findColumn("name"), is(1));
    }
    
    @Test(expected = SQLException.class)
    public void assertFindColumnError() throws SQLException {
        OrderByMemoryResultSet rs = new OrderByMemoryResultSet(
                Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)), Collections.singletonList(new OrderItem(1, OrderType.ASC)));
        rs.findColumn("unknown");
    }
    
    @Test
    public void assertNullValue() throws SQLException {
        Map<String, Object> rs1 = new TreeMap<>();
        rs1.put("name", "name");
        rs1.put("time", null);
        OrderByMemoryResultSet rs = new OrderByMemoryResultSet(
                Collections.<ResultSet>singletonList(new MockResultSet<>(Collections.singletonList(rs1))), Collections.singletonList(new OrderItem(1, OrderType.ASC)));
        assertTrue(rs.next());
        assertThat(rs.getObject(2), nullValue());
        assertTrue(rs.wasNull());
    }
    
    @Test
    public void assertOthers() throws SQLException {
        OrderByMemoryResultSet rs = new OrderByMemoryResultSet(
                Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)), Collections.singletonList(new OrderItem(1, OrderType.ASC)));
        assertTrue(rs.next());
        assertThat(rs.getFetchDirection(), is(ResultSet.FETCH_FORWARD));
        assertThat(rs.getFetchSize(), is(9));
        assertThat(rs.getType(), is(ResultSet.TYPE_FORWARD_ONLY));
        assertThat(rs.getConcurrency(), is(ResultSet.CONCUR_READ_ONLY));
        rs.clearWarnings();
        assertThat(rs.getWarnings(), nullValue());
        assertThat(rs.getMetaData(), instanceOf(MockResultSet.MockResultSetMetaData.class));
    }
}