Commit ad1f42b3 authored by terrymanu's avatar terrymanu
Browse files

refactor ShardingExecuteEngine

parent 4f414c18
Loading
Loading
Loading
Loading
+4 −16
Original line number Diff line number Diff line
@@ -66,13 +66,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
     * @throws SQLException throw if execute failure
     */
    public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) throws SQLException {
        if (inputs.isEmpty()) {
            return Collections.emptyList();
        }
        Iterator<I> inputIterator = inputs.iterator();
        I firstInput = inputIterator.next();
        Collection<ListenableFuture<O>> restFutures = asyncExecute(Lists.newArrayList(inputIterator), callback);
        return getResults(syncExecute(firstInput, callback), restFutures);
        return execute(inputs, null, callback);
    }
    
    /**
@@ -93,7 +87,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        Iterator<I> inputIterator = inputs.iterator();
        I firstInput = inputIterator.next();
        Collection<ListenableFuture<O>> restFutures = asyncExecute(Lists.newArrayList(inputIterator), callback);
        return getResults(syncExecute(firstInput, firstCallback), restFutures);
        return getResults(syncExecute(firstInput, null == firstCallback ? callback : firstCallback), restFutures);
    }
    
    private <I, O> Collection<ListenableFuture<O>> asyncExecute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) {
@@ -138,13 +132,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
     * @throws SQLException throw if execute failure
     */
    public <I, O> List<O> groupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        if (inputs.isEmpty()) {
            return Collections.emptyList();
        }
        String firstKey = inputs.keySet().iterator().next();
        Collection<I> firstInputs = inputs.remove(firstKey);
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
        return getGroupResults(syncGroupExecute(firstKey, firstInputs, callback), restResultFutures);
        return groupExecute(inputs, null, callback);
    }
    
    /**
@@ -166,7 +154,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        String firstKey = inputs.keySet().iterator().next();
        Collection<I> firstInputs = inputs.remove(firstKey);
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
        return getGroupResults(syncGroupExecute(firstKey, firstInputs, firstCallback), restResultFutures);
        return getGroupResults(syncGroupExecute(firstKey, firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
    }
    
    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {