Commit 81abe770 authored by terrymanu's avatar terrymanu
Browse files

add sync exec for ShardingExecuteEngine

parent ef8fb160
Loading
Loading
Loading
Loading
+12 −4
Original line number Diff line number Diff line
@@ -70,7 +70,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        Iterator<I> inputIterator = inputs.iterator();
        I firstInput = inputIterator.next();
        Collection<ListenableFuture<O>> restFutures = asyncExecute(Lists.newArrayList(inputIterator), callback);
        return getResults(callback.execute(firstInput), restFutures);
        return getResults(syncExecute(firstInput, callback), restFutures);
    }
    
    /**
@@ -91,7 +91,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        Iterator<I> inputIterator = inputs.iterator();
        I firstInput = inputIterator.next();
        Collection<ListenableFuture<O>> restFutures = asyncExecute(Lists.newArrayList(inputIterator), callback);
        return getResults(firstCallback.execute(firstInput), restFutures);
        return getResults(syncExecute(firstInput, firstCallback), restFutures);
    }
    
    private <I, O> Collection<ListenableFuture<O>> asyncExecute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) {
@@ -108,6 +108,10 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        return result;
    }
    
    private <I, O> O syncExecute(final I input, final ShardingExecuteCallback<I, O> callback) throws Exception {
        return callback.execute(input);
    }
    
    private <O> List<O> getResults(final O firstResult, final Collection<ListenableFuture<O>> restFutures) throws ExecutionException, InterruptedException {
        List<O> result = new LinkedList<>();
        result.add(firstResult);
@@ -134,7 +138,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        String firstKey = inputs.keySet().iterator().next();
        Collection<I> firstInputs = inputs.remove(firstKey);
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
        return getGroupResults(callback.execute(firstKey, firstInputs), restResultFutures);
        return getGroupResults(syncGroupExecute(firstKey, firstInputs, callback), restResultFutures);
    }
    
    /**
@@ -156,7 +160,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        String firstKey = inputs.keySet().iterator().next();
        Collection<I> firstInputs = inputs.remove(firstKey);
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
        return getGroupResults(firstCallback.execute(firstKey, firstInputs), restResultFutures);
        return getGroupResults(syncGroupExecute(firstKey, firstInputs, firstCallback), restResultFutures);
    }
    
    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
@@ -173,6 +177,10 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        return result;
    }
    
    private <I, O> Collection<O> syncGroupExecute(final String dataSourceName, final Collection<I> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
        return callback.execute(dataSourceName, inputs);
    }
    
    private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws ExecutionException, InterruptedException {
        List<O> result = new LinkedList<>();
        result.addAll(firstResults);