Commit 3994ce1c authored by terrymanu's avatar terrymanu
Browse files

add ShardingGroupExecuteCallback

parent a73abaf0
Loading
Loading
Loading
Loading
+10 −8
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -107,33 +108,34 @@ public final class ShardingExecuteEngine implements AutoCloseable {
     * @return execute result
     * @throws Exception throw if execute failure
     */
    public <I, O> List<O> groupExecute(final Map<String, Collection<I>> inputs, final ShardingExecuteCallback<I, O> callback) throws Exception {
    public <I, O> List<O> groupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
        if (inputs.isEmpty()) {
            return Collections.emptyList();
        }
        Collection<I> firstInputs = inputs.remove(inputs.keySet().iterator().next());
        String firstKey = inputs.keySet().iterator().next();
        Collection<I> firstInputs = inputs.remove(firstKey);
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
        return getGroupResults(doGroupExecute(firstInputs, callback), restResultFutures);
        return getGroupResults(doGroupExecute(firstKey, firstInputs, callback), restResultFutures);
    }
    
    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, Collection<I>> inputs, final ShardingExecuteCallback<I, O> callback) {
    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
        Collection<ListenableFuture<Collection<O>>> result = new ArrayList<>(inputs.size());
        for (final Collection<I> each : inputs.values()) {
        for (final Entry<String, Collection<I>> entry : inputs.entrySet()) {
            result.add(executorService.submit(new Callable<Collection<O>>() {
                
                @Override
                public Collection<O> call() throws Exception {
                    return doGroupExecute(each, callback);
                    return doGroupExecute(entry.getKey(), entry.getValue(), callback);
                }
            }));
        }
        return result;
    }
    
    private <I, O> Collection<O> doGroupExecute(final Collection<I> input, final ShardingExecuteCallback<I, O> callback) throws Exception {
    private <I, O> Collection<O> doGroupExecute(final String key, final Collection<I> input, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
        Collection<O> result = new LinkedList<>();
        for (I each : input) {
            result.add(callback.execute(each));
            result.add(callback.execute(key, each));
        }
        return result;
    }
+39 −0
Original line number Diff line number Diff line
/*
 * Copyright 2016-2018 shardingsphere.io.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package io.shardingsphere.core.executor;

/**
 * Sharding group execute callback.
 * 
 * @author zhangliang
 * 
 * @param <I> type of input value
 * @param <O> type of output value
 */
public interface ShardingGroupExecuteCallback<I, O> {
    
    /**
     * Execute callback.
     * 
     * @param key input key
     * @param value input value
     * @return execute result
     * @throws Exception throw when execute failure
     */
    O execute(String key, I value) throws Exception;
}
+15 −6
Original line number Diff line number Diff line
@@ -40,7 +40,7 @@ import java.util.Map;
 * @param <T> class type of return value
 */
@RequiredArgsConstructor
public abstract class SQLExecuteCallback<T> implements ShardingExecuteCallback<StatementExecuteUnit, T> {
public abstract class SQLExecuteCallback<T> implements ShardingExecuteCallback<StatementExecuteUnit, T>, ShardingGroupExecuteCallback<StatementExecuteUnit, T> {
    
    private final SQLType sqlType;
    
@@ -51,17 +51,26 @@ public abstract class SQLExecuteCallback<T> implements ShardingExecuteCallback<S
    private final EventBus shardingEventBus = ShardingEventBusInstance.getInstance();
    
    @Override
    public final T execute(final StatementExecuteUnit input) throws Exception {
    public final T execute(final StatementExecuteUnit executeUnit) throws Exception {
        return executeInternal(executeUnit);
    }
    
    @Override
    public final T execute(final String dataSourceName, final StatementExecuteUnit executeUnit) throws Exception {
        return executeInternal(executeUnit);
    }
    
    private T executeInternal(final StatementExecuteUnit executeUnit) throws Exception {
        ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
        ExecutorDataMap.setDataMap(dataMap);
        List<SQLExecutionEvent> events = new LinkedList<>();
        for (List<Object> each : input.getSqlExecutionUnit().getSqlUnit().getParameterSets()) {
            SQLExecutionEvent event = SQLExecutionEventFactory.createEvent(sqlType, input, each);
        for (List<Object> each : executeUnit.getSqlExecutionUnit().getSqlUnit().getParameterSets()) {
            SQLExecutionEvent event = SQLExecutionEventFactory.createEvent(sqlType, executeUnit, each);
            events.add(event);
            shardingEventBus.post(event);
        }
        try {
            T result = executeSQL(input);
            T result = executeSQL(executeUnit);
            for (SQLExecutionEvent each : events) {
                each.setExecuteSuccess();
                shardingEventBus.post(each);