Commit a6b2fdd0 authored by terrymanu's avatar terrymanu
Browse files

merge netty frontend userGroup and JDBC backend service executor, use...

merge netty frontend userGroup and JDBC backend service executor, use userGroup to deal with communicate MySQL
parent d4e5eb2f
Loading
Loading
Loading
Loading
+2 −4
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ import io.shardingsphere.core.rule.ShardingDataSourceNames;
import io.shardingsphere.core.rule.ShardingRule;
import io.shardingsphere.core.rule.TableRule;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

@@ -48,6 +49,7 @@ import java.util.concurrent.ExecutionException;
 * @author panjuan
 * @author zhaojun
 */
@RequiredArgsConstructor
@Getter
@Setter
@Slf4j
@@ -57,10 +59,6 @@ public abstract class ShardingMetaData {
    
    private Map<String, TableMetaData> tableMetaDataMap;
    
    public ShardingMetaData(final ListeningExecutorService executorService) {
        this.executorService = executorService;
    }
    
    /**
     * Initialize sharding metadata.
     *
+2 −3
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.text.query.FieldCo
import io.shardingsphere.proxy.transport.mysql.packet.generic.EofPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket;
import io.shardingsphere.proxy.util.ExecutorContext;
import io.shardingsphere.transaction.xa.AtomikosUserTransaction;
import lombok.Getter;
import lombok.Setter;
@@ -62,7 +63,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
@@ -120,11 +120,10 @@ public abstract class JDBCBackendHandler implements BackendHandler {
            return new CommandResponsePackets(new ErrPacket(1, ServerErrorCode.ER_ERROR_ON_MODIFYING_GTID_EXECUTED_TABLE, 
                    routeResult.getSqlStatement().getTables().isSingleTable() ? routeResult.getSqlStatement().getTables().getSingleTableName() : "unknown_table"));
        }
        ExecutorService executorService = ruleRegistry.getExecutorService();
        List<Future<CommandResponsePackets>> futureList = new ArrayList<>(1024);
        for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
            Statement statement = prepareResource(each.getDataSource(), each.getSqlUnit().getSql(), routeResult.getSqlStatement());
            futureList.add(executorService.submit(newSubmitTask(statement, routeResult.getSqlStatement(), each.getSqlUnit().getSql())));
            futureList.add(ExecutorContext.getInstance().getUserGroup().submit(newSubmitTask(statement, routeResult.getSqlStatement(), each.getSqlUnit().getSql())));
        }
        List<CommandResponsePackets> packets = buildCommandResponsePackets(futureList);
        CommandResponsePackets result = merge(routeResult.getSqlStatement(), packets);
+3 −5
Original line number Diff line number Diff line
@@ -19,7 +19,6 @@ package io.shardingsphere.proxy.config;

import com.google.common.base.Preconditions;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.shardingsphere.core.api.config.ShardingRuleConfiguration;
import io.shardingsphere.core.constant.DatabaseType;
@@ -76,8 +75,6 @@ public final class RuleRegistry {
    
    private ShardingMetaData shardingMetaData;
    
    private ListeningExecutorService executorService;
    
    private boolean showSQL;
    
    private ProxyMode proxyMode;
@@ -121,9 +118,10 @@ public final class RuleRegistry {
                dataSourceConfigurationMap.put(entry.getKey(), entry.getValue());
            }
        }
        executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxWorkingThreads));
        shardingDataSourceMetaData = new ShardingDataSourceMetaData(dataSourceMap, shardingRule, DatabaseType.MySQL);
        shardingMetaData = new ProxyShardingMetaData(executorService, dataSourceMap);
        // TODO metadata only as pojo, executor service should out of metadata
        // TODO executor service leak here, should shutdown original executor service
        shardingMetaData = new ProxyShardingMetaData(MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxWorkingThreads)), dataSourceMap);
        if (!isMasterSlaveOnly()) {
            shardingMetaData.init(shardingRule);
        }
+16 −25
Original line number Diff line number Diff line
@@ -17,14 +17,12 @@

package io.shardingsphere.proxy.frontend;

import io.netty.channel.WriteBufferWaterMark;
import io.shardingsphere.proxy.frontend.common.netty.ServerHandlerInitializer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -34,6 +32,8 @@ import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.shardingsphere.proxy.backend.ShardingProxyClient;
import io.shardingsphere.proxy.config.RuleRegistry;
import io.shardingsphere.proxy.frontend.common.netty.ServerHandlerInitializer;
import io.shardingsphere.proxy.util.ExecutorContext;

import java.net.MalformedURLException;

@@ -46,14 +46,14 @@ import java.net.MalformedURLException;
 */
public final class ShardingProxy {
    
    private static final int WORKER_MAX_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    private final RuleRegistry ruleRegistry = RuleRegistry.getInstance();
    
    private final ExecutorContext executorContext = ExecutorContext.getInstance();
    
    private EventLoopGroup bossGroup;
    
    private EventLoopGroup workerGroup;
    
    private EventLoopGroup userGroup;
    
    /**
     * Start Sharding-Proxy.
     *
@@ -63,12 +63,11 @@ public final class ShardingProxy {
     */
    public void start(final int port) throws InterruptedException, MalformedURLException {
        try {
            if (RuleRegistry.getInstance().isWithoutJdbc()) {
            if (ruleRegistry.isWithoutJdbc()) {
                ShardingProxyClient.getInstance().start();
            }
            ServerBootstrap bootstrap = new ServerBootstrap();
            bossGroup = createEventLoopGroup();
            if (bossGroup instanceof EpollEventLoopGroup) {
            if (executorContext.canUseEpoll()) {
                groupsEpoll(bootstrap);
            } else {
                groupsNio(bootstrap);
@@ -78,24 +77,16 @@ public final class ShardingProxy {
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            userGroup.shutdownGracefully();
            if (RuleRegistry.getInstance().isWithoutJdbc()) {
            executorContext.getUserGroup().shutdownGracefully();
            if (ruleRegistry.isWithoutJdbc()) {
                ShardingProxyClient.getInstance().stop();
            }
        }
    }
    
    private EventLoopGroup createEventLoopGroup() {
        try {
            return new EpollEventLoopGroup(1);
        } catch (final UnsatisfiedLinkError ex) {
            return new NioEventLoopGroup(1);
        }
    }
    
    private void groupsEpoll(final ServerBootstrap bootstrap) {
        workerGroup = new EpollEventLoopGroup(WORKER_MAX_THREADS);
        userGroup = new EpollEventLoopGroup(WORKER_MAX_THREADS);
        bossGroup = new EpollEventLoopGroup(1);
        workerGroup = new EpollEventLoopGroup(ruleRegistry.getMaxWorkingThreads());
        bootstrap.group(bossGroup, workerGroup)
                .channel(EpollServerSocketChannel.class)
                .option(EpollChannelOption.SO_BACKLOG, 128)
@@ -103,12 +94,12 @@ public final class ShardingProxy {
                .option(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ServerHandlerInitializer(userGroup));
                .childHandler(new ServerHandlerInitializer());
    }
    
    private void groupsNio(final ServerBootstrap bootstrap) {
        workerGroup = new NioEventLoopGroup(WORKER_MAX_THREADS);
        userGroup = new NioEventLoopGroup(WORKER_MAX_THREADS);
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup(ruleRegistry.getMaxWorkingThreads());
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
@@ -117,6 +108,6 @@ public final class ShardingProxy {
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ServerHandlerInitializer(userGroup));
                .childHandler(new ServerHandlerInitializer());
    }
}
+2 −4
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@

package io.shardingsphere.proxy.frontend.common;

import io.netty.channel.EventLoopGroup;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.proxy.frontend.mysql.MySQLFrontendHandler;
import lombok.AccessLevel;
@@ -36,13 +35,12 @@ public final class FrontendHandlerFactory {
     * Create frontend handler instance.
     *
     * @param databaseType database type
     * @param userGroup user thread pool
     * @return frontend handler instance
     */
    public static FrontendHandler createFrontendHandlerInstance(final DatabaseType databaseType, final EventLoopGroup userGroup) {
    public static FrontendHandler createFrontendHandlerInstance(final DatabaseType databaseType) {
        switch (databaseType) {
            case MySQL:
                return new MySQLFrontendHandler(userGroup);
                return new MySQLFrontendHandler();
            default:
                throw new UnsupportedOperationException(String.format("Cannot support database type '%s'", databaseType));
        }
Loading