Commit ce3f813a authored by terrymanu's avatar terrymanu
Browse files

use ShardingExecuteEngine on ExecutorEngine

parent f3c4292e
Loading
Loading
Loading
Loading
+52 −1
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -56,7 +57,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
    /**
     * Execute all callbacks.
     *
     * @param inputs sharding execute callbacks
     * @param inputs input values
     * @param callback sharding execute callback
     * @param <I> type of input value
     * @param <O> type of return value
@@ -96,6 +97,56 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        return result;
    }
    
    /**
     * execute all callbacks for group.
     *
     * @param inputs input value's map
     * @param callback sharding execute callback
     * @param <I> type of input value
     * @param <O> type of return value
     * @return execute result
     * @throws Exception throw if execute failure
     */
    public <I, O> List<O> groupExecute(final Map<String, Collection<I>> inputs, final ShardingExecuteCallback<I, O> callback) throws Exception {
        if (inputs.isEmpty()) {
            return Collections.emptyList();
        }
        Collection<I> firstInputs = inputs.remove(inputs.keySet().iterator().next());
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
        return getGroupResults(doGroupExecute(firstInputs, callback), restResultFutures);
    }
    
    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, Collection<I>> inputs, final ShardingExecuteCallback<I, O> callback) {
        Collection<ListenableFuture<Collection<O>>> result = new ArrayList<>(inputs.size());
        for (final Collection<I> each : inputs.values()) {
            result.add(executorService.submit(new Callable<Collection<O>>() {
                
                @Override
                public Collection<O> call() throws Exception {
                    return doGroupExecute(each, callback);
                }
            }));
        }
        return result;
    }
    
    private <I, O> Collection<O> doGroupExecute(final Collection<I> input, final ShardingExecuteCallback<I, O> callback) throws Exception {
        Collection<O> result = new LinkedList<>();
        for (I each : input) {
            result.add(callback.execute(each));
        }
        return result;
    }
    
    private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws ExecutionException, InterruptedException {
        List<O> result = new LinkedList<>();
        result.addAll(firstResults);
        for (ListenableFuture<Collection<O>> each : restFutures) {
            result.addAll(each.get());
        }
        return result;
    }
    
    @Override
    public void close() {
        SHUTDOWN_EXECUTOR.execute(new Runnable() {
+8 −35
Original line number Diff line number Diff line
@@ -17,22 +17,16 @@

package io.shardingsphere.core.executor;

import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.event.overall.OverallExecutionEvent;
import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * SQL execute engine.
@@ -42,21 +36,13 @@ import java.util.concurrent.TimeUnit;
 * @author maxiaoguang
 * @author panjuan
 */
@Slf4j
public abstract class ExecutorEngine implements AutoCloseable {
    
    private static final ExecutorService SHUTDOWN_EXECUTOR = Executors.newSingleThreadExecutor(ShardingThreadFactoryBuilder.build("ExecutorEngineCloser"));
    
    @Getter
    private final ListeningExecutorService executorService;
    
    private final EventBus shardingEventBus;
    private final ShardingExecuteEngine shardingExecuteEngine;
    
    public ExecutorEngine(final int executorSize) {
        executorService = MoreExecutors.listeningDecorator(
                0 == executorSize ? Executors.newCachedThreadPool(ShardingThreadFactoryBuilder.build()) : Executors.newFixedThreadPool(executorSize, ShardingThreadFactoryBuilder.build()));
        MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
        shardingEventBus = ShardingEventBusInstance.getInstance();
        shardingExecuteEngine = new ShardingExecuteEngine(executorSize);
    }
    
    /**
@@ -73,9 +59,9 @@ public abstract class ExecutorEngine implements AutoCloseable {
            return Collections.emptyList();
        }
        OverallExecutionEvent event = new OverallExecutionEvent(executeCallback.getSqlType(), baseStatementUnits.size() > 1);
        shardingEventBus.post(event);
        ShardingEventBusInstance.getInstance().post(event);
        try {
            List<T> result = getExecuteResults(baseStatementUnits, executeCallback);
            List<T> result = getExecuteResults(new LinkedList<>(baseStatementUnits), executeCallback);
            event.setExecuteSuccess();
            return result;
            // CHECKSTYLE:OFF
@@ -85,27 +71,14 @@ public abstract class ExecutorEngine implements AutoCloseable {
            ExecutorExceptionHandler.handleException(ex);
            return Collections.emptyList();
        } finally {
            shardingEventBus.post(event);
            ShardingEventBusInstance.getInstance().post(event);
        }
    }
    
    protected abstract <T> List<T> getExecuteResults(Collection<? extends BaseStatementUnit> baseStatementUnits, ExecuteCallback<T> executeCallback) throws Exception;
    protected abstract <T> List<T> getExecuteResults(Collection<BaseStatementUnit> baseStatementUnits, ExecuteCallback<T> executeCallback) throws Exception;
    
    @Override
    public final void close() {
        SHUTDOWN_EXECUTOR.execute(new Runnable() {
            
            @Override
            public void run() {
                try {
                    executorService.shutdown();
                    while (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
                        executorService.shutdownNow();
                    }
                } catch (final InterruptedException ex) {
                    log.error("ExecutorEngine can not been terminated", ex);
                }
            }
        });
        shardingExecuteEngine.close();
    }
}
+2 −44
Original line number Diff line number Diff line
@@ -17,19 +17,15 @@

package io.shardingsphere.core.executor.type.connection;

import com.google.common.util.concurrent.ListenableFuture;
import io.shardingsphere.core.executor.BaseStatementUnit;
import io.shardingsphere.core.executor.ExecuteCallback;
import io.shardingsphere.core.executor.ExecutorEngine;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

/**
 * Connection strictly execute engine.
@@ -43,11 +39,8 @@ public final class ConnectionStrictlyExecutorEngine extends ExecutorEngine {
    }
    
    @Override
    protected <T> List<T> getExecuteResults(final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
        Map<String, Collection<BaseStatementUnit>> baseStatementUnitGroups = getBaseStatementUnitGroups(baseStatementUnits);
        Collection<T> firstOutputs = syncExecute(baseStatementUnitGroups.remove(baseStatementUnitGroups.keySet().iterator().next()), executeCallback);
        Collection<ListenableFuture<Collection<T>>> restResultFutures = asyncExecute(baseStatementUnitGroups, executeCallback);
        return getResultList(firstOutputs, restResultFutures);
    protected <T> List<T> getExecuteResults(final Collection<BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
        return getShardingExecuteEngine().groupExecute(getBaseStatementUnitGroups(baseStatementUnits), executeCallback);
    }
    
    private Map<String, Collection<BaseStatementUnit>> getBaseStatementUnitGroups(final Collection<? extends BaseStatementUnit> baseStatementUnits) {
@@ -61,39 +54,4 @@ public final class ConnectionStrictlyExecutorEngine extends ExecutorEngine {
        }
        return result;
    }
    
    private <T> Collection<ListenableFuture<Collection<T>>> asyncExecute(final Map<String, Collection<BaseStatementUnit>> baseStatementUnitGroups, final ExecuteCallback<T> executeCallback) {
        Collection<ListenableFuture<Collection<T>>> result = new ArrayList<>(baseStatementUnitGroups.size());
        for (Map.Entry<String, Collection<BaseStatementUnit>> entry : baseStatementUnitGroups.entrySet()) {
            final Collection<BaseStatementUnit> baseStatementUnits = entry.getValue();
            result.add(getExecutorService().submit(new Callable<Collection<T>>() {
                @Override
                public Collection<T> call() throws Exception {
                    Collection<T> result = new LinkedList<>();
                    for (BaseStatementUnit each : baseStatementUnits) {
                        result.add(executeCallback.execute(each));
                    }
                    return result;
                }
            }));
        }
        return result;
    }
    
    private <T> Collection<T> syncExecute(final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
        Collection<T> result = new LinkedList<>();
        for (BaseStatementUnit each : baseStatementUnits) {
            result.add(executeCallback.execute(each));
        }
        return result;
    }
    
    private <T> List<T> getResultList(final Collection<T> firstOutputs, final Collection<ListenableFuture<Collection<T>>> restResultFutures) throws ExecutionException, InterruptedException {
        List<T> result = new LinkedList<>();
        result.addAll(firstOutputs);
        for (ListenableFuture<Collection<T>> each : restResultFutures) {
            result.addAll(each.get());
        }
        return result;
    }
}
+2 −39
Original line number Diff line number Diff line
@@ -17,19 +17,12 @@

package io.shardingsphere.core.executor.type.memory;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import io.shardingsphere.core.executor.BaseStatementUnit;
import io.shardingsphere.core.executor.ExecuteCallback;
import io.shardingsphere.core.executor.ExecutorEngine;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

/**
 * Memory strictly execute engine.
@@ -43,38 +36,8 @@ public final class MemoryStrictlyExecutorEngine extends ExecutorEngine {
    }
    
    @Override
    protected <T> List<T> getExecuteResults(final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
        Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
        T firstOutput = syncExecute(iterator.next(), executeCallback);
        Collection<ListenableFuture<T>> restFutures = asyncExecute(Lists.newArrayList(iterator), executeCallback);
        return getResultList(firstOutput, restFutures);
    }
    
    private <T> Collection<ListenableFuture<T>> asyncExecute(final Collection<BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) {
        List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
        for (final BaseStatementUnit each : baseStatementUnits) {
            result.add(getExecutorService().submit(new Callable<T>() {
                
                @Override
                public T call() throws Exception {
                    return executeCallback.execute(each);
                }
            }));
        }
        return result;
    }
    
    private <T> T syncExecute(final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback) throws Exception {
        return executeCallback.execute(baseStatementUnit);
    }
    
    private <T> List<T> getResultList(final T firstOutput, final Collection<ListenableFuture<T>> restResultFutures) throws ExecutionException, InterruptedException {
        List<T> result = new LinkedList<>();
        result.add(firstOutput);
        for (ListenableFuture<T> each : restResultFutures) {
            result.add(each.get());
        }
        return result;
    protected <T> List<T> getExecuteResults(final Collection<BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
        return getShardingExecuteEngine().execute(baseStatementUnits, executeCallback);
    }
}
+4 −4
Original line number Diff line number Diff line
@@ -75,8 +75,8 @@ public class ShardingDataSource extends AbstractDataSourceAdapter implements Aut
        int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
        ConnectionMode connectionMode = ConnectionMode.valueOf(shardingProperties.<String>getValue(ShardingPropertiesConstant.CONNECTION_MODE));
        executorEngine = ConnectionMode.MEMORY_STRICTLY == connectionMode ? new MemoryStrictlyExecutorEngine(executorSize) : new ConnectionStrictlyExecutorEngine(executorSize);
        ShardingMetaData shardingMetaData = new ShardingMetaData(
                getDataSourceURLs(dataSourceMap), shardingRule, getDatabaseType(), executorEngine.getExecutorService(), new JDBCTableMetaDataConnectionManager(dataSourceMap));
        ShardingMetaData shardingMetaData = new ShardingMetaData(getDataSourceURLs(dataSourceMap), shardingRule, getDatabaseType(), 
                executorEngine.getShardingExecuteEngine().getExecutorService(), new JDBCTableMetaDataConnectionManager(dataSourceMap));
        boolean showSQL = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
        shardingContext = new ShardingContext(dataSourceMap, shardingRule, getDatabaseType(), executorEngine, shardingMetaData, connectionMode, showSQL);
    }
@@ -116,8 +116,8 @@ public class ShardingDataSource extends AbstractDataSourceAdapter implements Aut
            originalExecutorEngine.close();
        }
        shardingProperties = newShardingProperties;
        ShardingMetaData shardingMetaData = new ShardingMetaData(
                getDataSourceURLs(newDataSourceMap), newShardingRule, getDatabaseType(), executorEngine.getExecutorService(), new JDBCTableMetaDataConnectionManager(newDataSourceMap));
        ShardingMetaData shardingMetaData = new ShardingMetaData(getDataSourceURLs(newDataSourceMap), newShardingRule, getDatabaseType(), 
                executorEngine.getShardingExecuteEngine().getExecutorService(), new JDBCTableMetaDataConnectionManager(newDataSourceMap));
        boolean newShowSQL = newShardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
        shardingContext = new ShardingContext(newDataSourceMap, newShardingRule, getDatabaseType(), executorEngine, shardingMetaData, newConnectionMode, newShowSQL);
    }
Loading