Commit 0baaad2d authored by tristaZero's avatar tristaZero
Browse files

Merge branch 'dev' of ssh://github.com/shardingjdbc/sharding-jdbc into dev

parents 620d05a6 a6b2fdd0
Loading
Loading
Loading
Loading
+2 −4
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ import io.shardingsphere.core.rule.ShardingDataSourceNames;
import io.shardingsphere.core.rule.ShardingRule;
import io.shardingsphere.core.rule.TableRule;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

@@ -48,6 +49,7 @@ import java.util.concurrent.ExecutionException;
 * @author panjuan
 * @author zhaojun
 */
@RequiredArgsConstructor
@Getter
@Setter
@Slf4j
@@ -57,10 +59,6 @@ public abstract class ShardingMetaData {
    
    private Map<String, TableMetaData> tableMetaDataMap;
    
    public ShardingMetaData(final ListeningExecutorService executorService) {
        this.executorService = executorService;
    }
    
    /**
     * Initialize sharding metadata.
     *
+2 −3
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.text.query.FieldCo
import io.shardingsphere.proxy.transport.mysql.packet.generic.EofPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket;
import io.shardingsphere.proxy.util.ExecutorContext;
import io.shardingsphere.transaction.xa.AtomikosUserTransaction;
import lombok.Getter;
import lombok.Setter;
@@ -62,7 +63,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
@@ -120,11 +120,10 @@ public abstract class JDBCBackendHandler implements BackendHandler {
            return new CommandResponsePackets(new ErrPacket(1, ServerErrorCode.ER_ERROR_ON_MODIFYING_GTID_EXECUTED_TABLE, 
                    routeResult.getSqlStatement().getTables().isSingleTable() ? routeResult.getSqlStatement().getTables().getSingleTableName() : "unknown_table"));
        }
        ExecutorService executorService = ruleRegistry.getExecutorService();
        List<Future<CommandResponsePackets>> futureList = new ArrayList<>(1024);
        for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
            Statement statement = prepareResource(each.getDataSource(), each.getSqlUnit().getSql(), routeResult.getSqlStatement());
            futureList.add(executorService.submit(newSubmitTask(statement, routeResult.getSqlStatement(), each.getSqlUnit().getSql())));
            futureList.add(ExecutorContext.getInstance().getUserGroup().submit(newSubmitTask(statement, routeResult.getSqlStatement(), each.getSqlUnit().getSql())));
        }
        List<CommandResponsePackets> packets = buildCommandResponsePackets(futureList);
        CommandResponsePackets result = merge(routeResult.getSqlStatement(), packets);
+3 −5
Original line number Diff line number Diff line
@@ -19,7 +19,6 @@ package io.shardingsphere.proxy.config;

import com.google.common.base.Preconditions;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.shardingsphere.core.api.config.ShardingRuleConfiguration;
import io.shardingsphere.core.constant.DatabaseType;
@@ -76,8 +75,6 @@ public final class RuleRegistry {
    
    private ShardingMetaData shardingMetaData;
    
    private ListeningExecutorService executorService;
    
    private boolean showSQL;
    
    private ProxyMode proxyMode;
@@ -121,9 +118,10 @@ public final class RuleRegistry {
                dataSourceConfigurationMap.put(entry.getKey(), entry.getValue());
            }
        }
        executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxWorkingThreads));
        shardingDataSourceMetaData = new ShardingDataSourceMetaData(dataSourceMap, shardingRule, DatabaseType.MySQL);
        shardingMetaData = new ProxyShardingMetaData(executorService, dataSourceMap);
        // TODO metadata only as pojo, executor service should out of metadata
        // TODO executor service leak here, should shutdown original executor service
        shardingMetaData = new ProxyShardingMetaData(MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxWorkingThreads)), dataSourceMap);
        if (!isMasterSlaveOnly()) {
            shardingMetaData.init(shardingRule);
        }
+16 −25
Original line number Diff line number Diff line
@@ -17,14 +17,12 @@

package io.shardingsphere.proxy.frontend;

import io.netty.channel.WriteBufferWaterMark;
import io.shardingsphere.proxy.frontend.common.netty.ServerHandlerInitializer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -34,6 +32,8 @@ import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.shardingsphere.proxy.backend.ShardingProxyClient;
import io.shardingsphere.proxy.config.RuleRegistry;
import io.shardingsphere.proxy.frontend.common.netty.ServerHandlerInitializer;
import io.shardingsphere.proxy.util.ExecutorContext;

import java.net.MalformedURLException;

@@ -46,14 +46,14 @@ import java.net.MalformedURLException;
 */
public final class ShardingProxy {
    
    private static final int WORKER_MAX_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    private final RuleRegistry ruleRegistry = RuleRegistry.getInstance();
    
    private final ExecutorContext executorContext = ExecutorContext.getInstance();
    
    private EventLoopGroup bossGroup;
    
    private EventLoopGroup workerGroup;
    
    private EventLoopGroup userGroup;
    
    /**
     * Start Sharding-Proxy.
     *
@@ -63,12 +63,11 @@ public final class ShardingProxy {
     */
    public void start(final int port) throws InterruptedException, MalformedURLException {
        try {
            if (RuleRegistry.getInstance().isWithoutJdbc()) {
            if (ruleRegistry.isWithoutJdbc()) {
                ShardingProxyClient.getInstance().start();
            }
            ServerBootstrap bootstrap = new ServerBootstrap();
            bossGroup = createEventLoopGroup();
            if (bossGroup instanceof EpollEventLoopGroup) {
            if (executorContext.canUseEpoll()) {
                groupsEpoll(bootstrap);
            } else {
                groupsNio(bootstrap);
@@ -78,24 +77,16 @@ public final class ShardingProxy {
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            userGroup.shutdownGracefully();
            if (RuleRegistry.getInstance().isWithoutJdbc()) {
            executorContext.getUserGroup().shutdownGracefully();
            if (ruleRegistry.isWithoutJdbc()) {
                ShardingProxyClient.getInstance().stop();
            }
        }
    }
    
    private EventLoopGroup createEventLoopGroup() {
        try {
            return new EpollEventLoopGroup(1);
        } catch (final UnsatisfiedLinkError ex) {
            return new NioEventLoopGroup(1);
        }
    }
    
    private void groupsEpoll(final ServerBootstrap bootstrap) {
        workerGroup = new EpollEventLoopGroup(WORKER_MAX_THREADS);
        userGroup = new EpollEventLoopGroup(WORKER_MAX_THREADS);
        bossGroup = new EpollEventLoopGroup(1);
        workerGroup = new EpollEventLoopGroup(ruleRegistry.getMaxWorkingThreads());
        bootstrap.group(bossGroup, workerGroup)
                .channel(EpollServerSocketChannel.class)
                .option(EpollChannelOption.SO_BACKLOG, 128)
@@ -103,12 +94,12 @@ public final class ShardingProxy {
                .option(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ServerHandlerInitializer(userGroup));
                .childHandler(new ServerHandlerInitializer());
    }
    
    private void groupsNio(final ServerBootstrap bootstrap) {
        workerGroup = new NioEventLoopGroup(WORKER_MAX_THREADS);
        userGroup = new NioEventLoopGroup(WORKER_MAX_THREADS);
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup(ruleRegistry.getMaxWorkingThreads());
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
@@ -117,6 +108,6 @@ public final class ShardingProxy {
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ServerHandlerInitializer(userGroup));
                .childHandler(new ServerHandlerInitializer());
    }
}
+2 −4
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@

package io.shardingsphere.proxy.frontend.common;

import io.netty.channel.EventLoopGroup;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.proxy.frontend.mysql.MySQLFrontendHandler;
import lombok.AccessLevel;
@@ -36,13 +35,12 @@ public final class FrontendHandlerFactory {
     * Create frontend handler instance.
     *
     * @param databaseType database type
     * @param userGroup user thread pool
     * @return frontend handler instance
     */
    public static FrontendHandler createFrontendHandlerInstance(final DatabaseType databaseType, final EventLoopGroup userGroup) {
    public static FrontendHandler createFrontendHandlerInstance(final DatabaseType databaseType) {
        switch (databaseType) {
            case MySQL:
                return new MySQLFrontendHandler(userGroup);
                return new MySQLFrontendHandler();
            default:
                throw new UnsupportedOperationException(String.format("Cannot support database type '%s'", databaseType));
        }
Loading