Commit 59c84067 authored by terrymanu's avatar terrymanu
Browse files

add ShardingGroupExecuteCallback

parent 3994ce1c
Loading
Loading
Loading
Loading
+2 −10
Original line number Diff line number Diff line
@@ -115,7 +115,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        String firstKey = inputs.keySet().iterator().next();
        Collection<I> firstInputs = inputs.remove(firstKey);
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
        return getGroupResults(doGroupExecute(firstKey, firstInputs, callback), restResultFutures);
        return getGroupResults(callback.execute(firstKey, firstInputs), restResultFutures);
    }
    
    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
@@ -125,21 +125,13 @@ public final class ShardingExecuteEngine implements AutoCloseable {
                
                @Override
                public Collection<O> call() throws Exception {
                    return doGroupExecute(entry.getKey(), entry.getValue(), callback);
                    return callback.execute(entry.getKey(), entry.getValue());
                }
            }));
        }
        return result;
    }
    
    private <I, O> Collection<O> doGroupExecute(final String key, final Collection<I> input, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
        Collection<O> result = new LinkedList<>();
        for (I each : input) {
            result.add(callback.execute(key, each));
        }
        return result;
    }
    
    private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws ExecutionException, InterruptedException {
        List<O> result = new LinkedList<>();
        result.addAll(firstResults);
+4 −2
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@

package io.shardingsphere.core.executor;

import java.util.Collection;

/**
 * Sharding group execute callback.
 * 
@@ -31,9 +33,9 @@ public interface ShardingGroupExecuteCallback<I, O> {
     * Execute callback.
     * 
     * @param key input key
     * @param value input value
     * @param values input values
     * @return execute result
     * @throws Exception throw when execute failure
     */
    O execute(String key, I value) throws Exception;
    Collection<O> execute(String key, Collection<I> values) throws Exception;
}
+7 −2
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler;
import lombok.RequiredArgsConstructor;

import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -56,8 +57,12 @@ public abstract class SQLExecuteCallback<T> implements ShardingExecuteCallback<S
    }
    
    @Override
    public final T execute(final String dataSourceName, final StatementExecuteUnit executeUnit) throws Exception {
        return executeInternal(executeUnit);
    public final Collection<T> execute(final String dataSourceName, final Collection<StatementExecuteUnit> executeUnits) throws Exception {
        Collection<T> result = new LinkedList<>();
        for (StatementExecuteUnit each : executeUnits) {
            result.add(executeInternal(each));
        }
        return result;
    }
    
    private T executeInternal(final StatementExecuteUnit executeUnit) throws Exception {