Commit 5d03d899 authored by gaohongtao's avatar gaohongtao
Browse files

fix #16

parent 60169850
Loading
Loading
Loading
Loading
+24 −34
Original line number Diff line number Diff line
@@ -18,19 +18,20 @@
package com.dangdang.ddframe.rdb.sharding.api;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.Properties;

import javax.sql.DataSource;

import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfiguration;
import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfigurationConstant;
import com.dangdang.ddframe.rdb.sharding.api.rule.ShardingRule;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine;
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingConnection;
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingContext;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDataSourceAdapter;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;
import com.dangdang.ddframe.rdb.sharding.router.SQLRouteEngine;
import com.dangdang.ddframe.rdb.sharding.threadlocal.ThreadLocalObjectRepository;
import com.google.common.base.Preconditions;

/**
@@ -40,51 +41,40 @@ import com.google.common.base.Preconditions;
 */
public class ShardingDataSource extends AbstractDataSourceAdapter {
    
    private final ShardingRule shardingRule;
    
    private final DatabaseMetaData databaseMetaData;
    private final ThreadLocalObjectRepository threadLocalObjectRepository = new ThreadLocalObjectRepository();
    
    private final ShardingConfiguration configuration;
    
    private final MetricsContext metricsContext;
    private final ShardingContext context;
    
    public ShardingDataSource(final ShardingRule shardingRule) {
        this(shardingRule, new Properties());
    }
    
    public ShardingDataSource(final ShardingRule shardingRule, final Properties props) {
        this.shardingRule = shardingRule;
        databaseMetaData = getDatabaseMetaData();
        configuration = new ShardingConfiguration(props);
        metricsContext = new MetricsContext(configuration.getConfig(ShardingConfigurationConstant.METRICS_ENABLE, boolean.class), 
                configuration.getConfig(ShardingConfigurationConstant.METRICS_SECOND_PERIOD, long.class), 
                configuration.getConfig(ShardingConfigurationConstant.METRICS_PACKAGE_NAME, String.class));
    }
    
    private DatabaseMetaData getDatabaseMetaData() {
        String databaseProductName = null;
        DatabaseMetaData result = null;
        for (DataSource each : shardingRule.getDataSourceRule().getDataSources()) {
            String databaseProductNameInEach;
            DatabaseMetaData metaDataInEach;
        Preconditions.checkNotNull(shardingRule);
        Preconditions.checkNotNull(props);
        ShardingConfiguration configuration = new ShardingConfiguration(props);
        initThreadLocalObjectRepository(configuration);
        DatabaseType type;
        try {
                metaDataInEach = each.getConnection().getMetaData();
                databaseProductNameInEach = metaDataInEach.getDatabaseProductName();
            } catch (final SQLException ex) {
                throw new ShardingJdbcException("Can not get data source DatabaseProductName", ex);
            type = DatabaseType.valueFrom(ShardingConnection.getDatabaseMetaDataFromDataSource(shardingRule.getDataSourceRule().getDataSources()).getDatabaseProductName());
        } catch (final SQLException e) {
            throw new ShardingJdbcException("Can not get database product name", e);
        }
        context = new ShardingContext(shardingRule, new SQLRouteEngine(shardingRule, type), new ExecutorEngine(configuration));
    }
            Preconditions.checkState(null == databaseProductName || databaseProductName.equals(databaseProductNameInEach), 
                    String.format("Database type inconsistent with '%s' and '%s'", databaseProductName, databaseProductNameInEach));
            databaseProductName = databaseProductNameInEach;
            result = metaDataInEach;
    
    private void initThreadLocalObjectRepository(final ShardingConfiguration configuration) {
        boolean enableMetrics = configuration.getConfig(ShardingConfigurationConstant.METRICS_ENABLE, boolean.class);
        if (enableMetrics) {
            threadLocalObjectRepository.addItem(new MetricsContext(configuration.getConfig(ShardingConfigurationConstant.METRICS_SECOND_PERIOD, long.class),
                    configuration.getConfig(ShardingConfigurationConstant.METRICS_PACKAGE_NAME, String.class)));
        }
        return result;
    }
    
    @Override
    public ShardingConnection getConnection() throws SQLException {
        metricsContext.register();
        return new ShardingConnection(shardingRule, databaseMetaData);
        threadLocalObjectRepository.open();
        return new ShardingConnection(context);
    }
    
    @Override
+27 −1
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@

package com.dangdang.ddframe.rdb.sharding.api.config;

import java.util.concurrent.TimeUnit;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@@ -45,9 +47,33 @@ public enum ShardingConfigurationConstant {
    /**
     * 度量输出在日志中的标识名称.
     */
    METRICS_PACKAGE_NAME("metrics.package.name", "com.dangdang.ddframe.rdb.sharding.metrics");
    METRICS_PACKAGE_NAME("metrics.package.name", "com.dangdang.ddframe.rdb.sharding.metrics"),
    
    /**
     * 最小空闲工作现成数量.
     */
    PARALLEL_EXECUTOR_WORKER_MIN_IDLE_SIZE("parallelExecutor.worker.minIdleSize", "0"),
    
    /**
     * 最大工作现成数量.
     */
    PARALLEL_EXECUTOR_WORKER_MAX_SIZE("parallelExecutor.worker.maxSize", defaultMaxThread()),
    
    /**
     * 工作线程空闲时超时时间.
     */
    PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT("parallelExecutor.worker.maxIdleTimeout", "60"),
    
    /**
     * 工作线程空闲时超时时间单位.
     */
    PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT_TIME_UNIT("parallelExecutor.worker.maxIdleTimeout.timeUnit", TimeUnit.SECONDS.toString());
    
    private final String key;
    
    private final String defaultValue;
    
    private static String defaultMaxThread() {
        return String.valueOf(Runtime.getRuntime().availableProcessors() * 2);
    }
}
+22 −14
Original line number Diff line number Diff line
@@ -23,17 +23,18 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfiguration;
import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfigurationConstant;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
@@ -41,10 +42,20 @@ import lombok.extern.slf4j.Slf4j;
 * 
 * @author gaohongtao
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public final class ExecutorEngine {
    
    private final ListeningExecutorService executorService;
    
    public ExecutorEngine(final ShardingConfiguration configuration) {
        executorService = MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(
                new ThreadPoolExecutor(configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MIN_IDLE_SIZE, int.class),
                configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MAX_SIZE, int.class),
                configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT, long.class),
                TimeUnit.valueOf(configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT_TIME_UNIT)),
                new LinkedBlockingQueue<Runnable>())));
    }
    
    /**
     * 多线程执行任务.
     * 
@@ -54,7 +65,7 @@ public final class ExecutorEngine {
     * @param <O> 出参类型
     * @return 执行结果
     */
    public static <I, O> List<O> execute(final Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
    public <I, O> List<O> execute(final Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
        ListenableFuture<List<O>> futures = submitFutures(inputs, executeUnit);
        addCallback(futures);
        return getFutureResults(futures);
@@ -71,15 +82,14 @@ public final class ExecutorEngine {
     * @param <O> 最终结果类型
     * @return 执行结果
     */
    public static <I, M, O> O execute(final Collection<I> inputs, final ExecuteUnit<I, M> executeUnit, final MergeUnit<M, O> mergeUnit) {
    public <I, M, O> O execute(final Collection<I> inputs, final ExecuteUnit<I, M> executeUnit, final MergeUnit<M, O> mergeUnit) {
        return mergeUnit.merge(execute(inputs, executeUnit));
    }
    
    private static <I, O> ListenableFuture<List<O>> submitFutures(final Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
    private <I, O> ListenableFuture<List<O>> submitFutures(final Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
        Set<ListenableFuture<O>> result = new HashSet<>(inputs.size());
        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(inputs.size()));
        for (final I each : inputs) {
            result.add(service.submit(new Callable<O>() {
            result.add(executorService.submit(new Callable<O>() {
                
                @Override
                public O call() throws Exception {
@@ -87,13 +97,11 @@ public final class ExecutorEngine {
                }
            }));
        }
        service.shutdown();
        return Futures.allAsList(result);
    }
    
    private static <T> void addCallback(final ListenableFuture<T> allFutures) {
    private <T> void addCallback(final ListenableFuture<T> allFutures) {
        Futures.addCallback(allFutures, new FutureCallback<T>() {
            
            @Override
            public void onSuccess(final T result) {
                log.trace("Concurrent execute result success {}", result);
@@ -106,7 +114,7 @@ public final class ExecutorEngine {
        });
    }
    
    private static <O> O getFutureResults(final ListenableFuture<O> futures) {
    private <O> O getFutureResults(final ListenableFuture<O> futures) {
        try {
            return futures.get();
        } catch (final InterruptedException | ExecutionException ex) {
+5 −4
Original line number Diff line number Diff line
@@ -26,7 +26,6 @@ import java.util.List;

import com.codahale.metrics.Timer.Context;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;

import lombok.RequiredArgsConstructor;

/**
@@ -37,6 +36,8 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public final class PreparedStatementExecutor {
    
    private final ExecutorEngine executorEngine;
    
    private final Collection<PreparedStatement> preparedStatements;
    
    /**
@@ -53,7 +54,7 @@ public final class PreparedStatementExecutor {
            MetricsContext.stop(context);
            return result;
        }
        result = ExecutorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, ResultSet>() {
        result = executorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, ResultSet>() {
            
            @Override
            public ResultSet execute(final PreparedStatement input) throws Exception {
@@ -78,7 +79,7 @@ public final class PreparedStatementExecutor {
            MetricsContext.stop(context);
            return result;
        }
        result = ExecutorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, Integer>() {
        result = executorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, Integer>() {
            
            @Override
            public Integer execute(final PreparedStatement input) throws Exception {
@@ -112,7 +113,7 @@ public final class PreparedStatementExecutor {
            MetricsContext.stop(context);
            return result;
        }
        List<Boolean> result = ExecutorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, Boolean>() {
        List<Boolean> result = executorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, Boolean>() {
            
            @Override
            public Boolean execute(final PreparedStatement input) throws Exception {
+5 −4
Original line number Diff line number Diff line
@@ -27,7 +27,6 @@ import java.util.List;

import com.codahale.metrics.Timer.Context;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;

import lombok.RequiredArgsConstructor;

/**
@@ -38,6 +37,8 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public final class StatementExecutor {
    
    private final ExecutorEngine executorEngine;
    
    private final Collection<StatementEntity> statements = new ArrayList<>();
    
    /**
@@ -65,7 +66,7 @@ public final class StatementExecutor {
            MetricsContext.stop(context);
            return result;
        }
        result = ExecutorEngine.execute(statements, new ExecuteUnit<StatementEntity, ResultSet>() {
        result = executorEngine.execute(statements, new ExecuteUnit<StatementEntity, ResultSet>() {
            
            @Override
            public ResultSet execute(final StatementEntity input) throws Exception {
@@ -131,7 +132,7 @@ public final class StatementExecutor {
            MetricsContext.stop(context);
            return result;
        }
        result = ExecutorEngine.execute(statements, new ExecuteUnit<StatementEntity, Integer>() {
        result = executorEngine.execute(statements, new ExecuteUnit<StatementEntity, Integer>() {
            
            @Override
            public Integer execute(final StatementEntity input) throws Exception {
@@ -206,7 +207,7 @@ public final class StatementExecutor {
            MetricsContext.stop(context);
            return result;
        }
        List<Boolean> result = ExecutorEngine.execute(statements, new ExecuteUnit<StatementEntity, Boolean>() {
        List<Boolean> result = executorEngine.execute(statements, new ExecuteUnit<StatementEntity, Boolean>() {
            
            @Override
            public Boolean execute(final StatementEntity input) throws Exception {
Loading