Commit 869efcd5 authored by terrymanu's avatar terrymanu
Browse files

refactor StreamingOrderByReducerResultSet

parent f1caee0c
Loading
Loading
Loading
Loading
+15 −15
Original line number Diff line number Diff line
@@ -39,48 +39,48 @@ import java.util.Queue;
@Slf4j
public final class StreamingOrderByReducerResultSet extends AbstractDelegateResultSet {
    
    private final Queue<ResultSetOrderByWrapper> delegateResultSetQueue;
    private final Queue<ComparableResultSet> priorityQueue;
    
    private final List<OrderItem> orderItems;
    
    public StreamingOrderByReducerResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
        super(resultSetMergeContext.getShardingResultSets().getResultSets());
        delegateResultSetQueue = new PriorityQueue<>(getResultSets().size());
        priorityQueue = new PriorityQueue<>(getResultSets().size());
        orderItems = resultSetMergeContext.getCurrentOrderByKeys();
    }
    
    @Override
    protected boolean firstNext() throws SQLException {
        for (ResultSet each : getResultSets()) {
            ResultSetOrderByWrapper wrapper = new ResultSetOrderByWrapper(each);
            if (wrapper.next()) {
                delegateResultSetQueue.offer(wrapper);
            ComparableResultSet comparableResultSet = new ComparableResultSet(each);
            if (comparableResultSet.next()) {
                priorityQueue.offer(comparableResultSet);
            }
        }
        return doNext();
        return hasNext();
    }
    
    @Override
    protected boolean afterFirstNext() throws SQLException {
        ResultSetOrderByWrapper firstResultSet = delegateResultSetQueue.poll();
        ComparableResultSet firstResultSet = priorityQueue.poll();
        setDelegate(firstResultSet.resultSet);
        if (firstResultSet.next()) {
            delegateResultSetQueue.offer(firstResultSet);
            priorityQueue.offer(firstResultSet);
        }
        return doNext();
        return hasNext();
    }
    
    private boolean doNext() {
        if (delegateResultSetQueue.isEmpty()) {
    private boolean hasNext() {
        if (priorityQueue.isEmpty()) {
            return false;
        }
        setDelegate(delegateResultSetQueue.peek().resultSet);
        log.trace("Chosen order by value: {}, current result set hashcode: {}", delegateResultSetQueue.peek().row, getDelegate().hashCode());
        setDelegate(priorityQueue.peek().resultSet);
        log.trace("Chosen order by value: {}", priorityQueue.peek().row);
        return true;
    }
    
    @RequiredArgsConstructor
    private class ResultSetOrderByWrapper implements Comparable<ResultSetOrderByWrapper> {
    private class ComparableResultSet implements Comparable<ComparableResultSet> {
        
        private final ResultSet resultSet;
        
@@ -95,7 +95,7 @@ public final class StreamingOrderByReducerResultSet extends AbstractDelegateResu
        }
        
        @Override
        public int compareTo(final ResultSetOrderByWrapper o) {
        public int compareTo(final ComparableResultSet o) {
            return row.compareTo(o.row);
        }
    }