Commit 1ff33de5 authored by terrymanu's avatar terrymanu
Browse files

use ShardingExecuteEngine on TableMetaDataLoader

parent 59c84067
Loading
Loading
Loading
Loading
+4 −5
Original line number Diff line number Diff line
@@ -17,8 +17,8 @@

package io.shardingsphere.core.metadata;

import com.google.common.util.concurrent.MoreExecutors;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.metadata.datasource.ShardingDataSourceMetaData;
import io.shardingsphere.core.metadata.table.ShardingTableMetaData;
import io.shardingsphere.core.metadata.table.executor.TableMetaDataConnectionManager;
@@ -27,7 +27,6 @@ import io.shardingsphere.core.rule.ShardingRule;
import lombok.Getter;

import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
 * Sharding meta data.
@@ -42,8 +41,8 @@ public final class ShardingMetaData {
    private final ShardingTableMetaData table;
    
    public ShardingMetaData(final Map<String, String> dataSourceURLs, final ShardingRule shardingRule,
                            final DatabaseType databaseType, final ExecutorService executorService, final TableMetaDataConnectionManager connectionManager) {
                            final DatabaseType databaseType, final ShardingExecuteEngine shardingExecuteEngine, final TableMetaDataConnectionManager connectionManager) {
        dataSource = new ShardingDataSourceMetaData(dataSourceURLs, shardingRule, databaseType);
        table = new ShardingTableMetaData(new TableMetaDataInitializer(dataSource, MoreExecutors.listeningDecorator(executorService), connectionManager).load(shardingRule));
        table = new ShardingTableMetaData(new TableMetaDataInitializer(dataSource, shardingExecuteEngine, connectionManager).load(shardingRule));
    }
}
+3 −3
Original line number Diff line number Diff line
@@ -18,8 +18,8 @@
package io.shardingsphere.core.metadata.table.executor;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.metadata.datasource.ShardingDataSourceMetaData;
import io.shardingsphere.core.metadata.table.TableMetaData;
import io.shardingsphere.core.rule.ShardingRule;
@@ -45,9 +45,9 @@ public final class TableMetaDataInitializer {
    private final TableMetaDataLoader tableMetaDataLoader;
    
    public TableMetaDataInitializer(
            final ShardingDataSourceMetaData shardingDataSourceMetaData, final ListeningExecutorService executorService, final TableMetaDataConnectionManager connectionManager) {
            final ShardingDataSourceMetaData shardingDataSourceMetaData, final ShardingExecuteEngine shardingExecuteEngine, final TableMetaDataConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
        tableMetaDataLoader = new TableMetaDataLoader(shardingDataSourceMetaData, executorService, connectionManager);
        tableMetaDataLoader = new TableMetaDataLoader(shardingDataSourceMetaData, shardingExecuteEngine, connectionManager);
    }
    
    /**
+11 −25
Original line number Diff line number Diff line
@@ -17,10 +17,9 @@

package io.shardingsphere.core.metadata.table.executor;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.executor.ShardingGroupExecuteCallback;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import io.shardingsphere.core.metadata.datasource.ShardingDataSourceMetaData;
import io.shardingsphere.core.metadata.table.ColumnMetaData;
@@ -38,9 +37,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

/**
 * Table meta data loader.
@@ -52,7 +48,7 @@ public final class TableMetaDataLoader {
    
    private final ShardingDataSourceMetaData shardingDataSourceMetaData;
    
    private final ListeningExecutorService executorService;
    private final ShardingExecuteEngine shardingExecuteEngine;
    
    private final TableMetaDataConnectionManager connectionManager;
    
@@ -70,27 +66,17 @@ public final class TableMetaDataLoader {
    }
    
    private List<TableMetaData> load(final Map<String, Collection<String>> dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) {
        List<ListenableFuture<Collection<TableMetaData>>> futures = new LinkedList<>();
        for (Entry<String, Collection<String>> entry : dataNodeGroups.entrySet()) {
            final String dataSourceName = shardingDataSourceNames.getRawMasterDataSourceName(entry.getKey());
            DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(entry.getKey());
            final String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemeName();
            final Collection<String> actualTableNames = entry.getValue();
            futures.add(executorService.submit(new Callable<Collection<TableMetaData>>() {
        try {
            return shardingExecuteEngine.groupExecute(dataNodeGroups, new ShardingGroupExecuteCallback<String, TableMetaData>() {
                
                @Override
                public Collection<TableMetaData> call() throws SQLException {
                    return load(dataSourceName, catalog, actualTableNames);
                }
            }));
        }
        List<TableMetaData> result = new LinkedList<>();
        try {
            for (Collection<TableMetaData> each : Futures.allAsList(futures).get()) {
                result.addAll(each);
                public Collection<TableMetaData> execute(final String dataSourceName, final Collection<String> actualTableNames) throws SQLException {
                    DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);
                    final String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemeName();
                    return load(shardingDataSourceNames.getRawMasterDataSourceName(dataSourceName), catalog, actualTableNames);
                }
            return result;
        } catch (final InterruptedException | ExecutionException ex) {
            });
        } catch (final Exception ex) {
            throw new ShardingException(ex);
        }
    }
+4 −4
Original line number Diff line number Diff line
@@ -73,8 +73,8 @@ public class ShardingDataSource extends AbstractDataSourceAdapter implements Aut
        int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
        ConnectionMode connectionMode = ConnectionMode.valueOf(shardingProperties.<String>getValue(ShardingPropertiesConstant.CONNECTION_MODE));
        executeEngine = new ShardingExecuteEngine(executorSize);
        ShardingMetaData shardingMetaData = new ShardingMetaData(getDataSourceURLs(dataSourceMap), shardingRule, getDatabaseType(), 
                executeEngine.getExecutorService(), new JDBCTableMetaDataConnectionManager(dataSourceMap));
        ShardingMetaData shardingMetaData = new ShardingMetaData(
                getDataSourceURLs(dataSourceMap), shardingRule, getDatabaseType(), executeEngine, new JDBCTableMetaDataConnectionManager(dataSourceMap));
        boolean showSQL = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
        shardingContext = new ShardingContext(dataSourceMap, shardingRule, getDatabaseType(), executeEngine, shardingMetaData, connectionMode, showSQL);
    }
@@ -113,8 +113,8 @@ public class ShardingDataSource extends AbstractDataSourceAdapter implements Aut
            originalExecuteEngine.close();
        }
        shardingProperties = newShardingProperties;
        ShardingMetaData shardingMetaData = new ShardingMetaData(getDataSourceURLs(newDataSourceMap), newShardingRule, getDatabaseType(), 
                executeEngine.getExecutorService(), new JDBCTableMetaDataConnectionManager(newDataSourceMap));
        ShardingMetaData shardingMetaData = new ShardingMetaData(
                getDataSourceURLs(newDataSourceMap), newShardingRule, getDatabaseType(), executeEngine, new JDBCTableMetaDataConnectionManager(newDataSourceMap));
        boolean newShowSQL = newShardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
        shardingContext = new ShardingContext(newDataSourceMap, newShardingRule, getDatabaseType(), executeEngine, shardingMetaData, newConnectionMode, newShowSQL);
    }
+1 −2
Original line number Diff line number Diff line
@@ -248,8 +248,7 @@ public final class ShardingPreparedStatement extends AbstractShardingPreparedSta
        if (null != routeResult && null != connection && SQLType.DDL == routeResult.getSqlStatement().getType() && !routeResult.getSqlStatement().getTables().isEmpty()) {
            String logicTableName = routeResult.getSqlStatement().getTables().getSingleTableName();
            TableMetaDataLoader tableMetaDataLoader = new TableMetaDataLoader(connection.getShardingContext().getMetaData().getDataSource(), 
                    connection.getShardingContext().getExecuteEngine().getExecutorService(), 
                    new JDBCTableMetaDataConnectionManager(connection.getShardingContext().getDataSourceMap()));
                    connection.getShardingContext().getExecuteEngine(), new JDBCTableMetaDataConnectionManager(connection.getShardingContext().getDataSourceMap()));
            connection.getShardingContext().getMetaData().getTable().put(logicTableName, tableMetaDataLoader.load(logicTableName, connection.getShardingContext().getShardingRule()));
        }
    }
Loading