Commit 12ccc776 authored by terrymanu's avatar terrymanu
Browse files

remove MemoryStrictlyExecutorEngine & ConnectionStrictlyExecutorEngine

parent ce3f813a
Loading
Loading
Loading
Loading
+23 −9
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@

package io.shardingsphere.core.executor;

import io.shardingsphere.core.constant.ConnectionMode;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.event.overall.OverallExecutionEvent;
import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler;
@@ -25,8 +26,10 @@ import lombok.Getter;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
 * SQL execute engine.
@@ -36,32 +39,33 @@ import java.util.List;
 * @author maxiaoguang
 * @author panjuan
 */
public abstract class ExecutorEngine implements AutoCloseable {
public final class ExecutorEngine implements AutoCloseable {
    
    @Getter
    private final ShardingExecuteEngine shardingExecuteEngine;
    
    public ExecutorEngine(final int executorSize) {
    private final ConnectionMode connectionMode;
    
    public ExecutorEngine(final int executorSize, final ConnectionMode connectionMode) {
        shardingExecuteEngine = new ShardingExecuteEngine(executorSize);
        this.connectionMode = connectionMode;
    }
    
    /**
     * Execute.
     *
     * @param baseStatementUnits statement execute unitS
     * @param baseStatementUnits statement execute units
     * @param executeCallback prepared statement execute callback
     * @param <T> class type of return value
     * @return execute result
     * @throws SQLException SQL exception
     */
    public <T> List<T> execute(final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws SQLException {
        if (baseStatementUnits.isEmpty()) {
            return Collections.emptyList();
        }
        OverallExecutionEvent event = new OverallExecutionEvent(executeCallback.getSqlType(), baseStatementUnits.size() > 1);
        ShardingEventBusInstance.getInstance().post(event);
        try {
            List<T> result = getExecuteResults(new LinkedList<>(baseStatementUnits), executeCallback);
            List<T> result = ConnectionMode.MEMORY_STRICTLY == connectionMode ? shardingExecuteEngine.execute(new LinkedList<>(baseStatementUnits), executeCallback)
                    : shardingExecuteEngine.groupExecute(getBaseStatementUnitGroups(baseStatementUnits), executeCallback);
            event.setExecuteSuccess();
            return result;
            // CHECKSTYLE:OFF
@@ -75,10 +79,20 @@ public abstract class ExecutorEngine implements AutoCloseable {
        }
    }
    
    protected abstract <T> List<T> getExecuteResults(Collection<BaseStatementUnit> baseStatementUnits, ExecuteCallback<T> executeCallback) throws Exception;
    private Map<String, Collection<BaseStatementUnit>> getBaseStatementUnitGroups(final Collection<? extends BaseStatementUnit> baseStatementUnits) {
        Map<String, Collection<BaseStatementUnit>> result = new LinkedHashMap<>(baseStatementUnits.size(), 1);
        for (BaseStatementUnit each : baseStatementUnits) {
            String dataSourceName = each.getSqlExecutionUnit().getDataSource();
            if (!result.keySet().contains(dataSourceName)) {
                result.put(dataSourceName, new LinkedList<BaseStatementUnit>());
            }
            result.get(dataSourceName).add(each);
        }
        return result;
    }
    
    @Override
    public final void close() {
    public void close() {
        shardingExecuteEngine.close();
    }
}
+0 −57
Original line number Diff line number Diff line
/*
 * Copyright 2016-2018 shardingsphere.io.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package io.shardingsphere.core.executor.type.connection;

import io.shardingsphere.core.executor.BaseStatementUnit;
import io.shardingsphere.core.executor.ExecuteCallback;
import io.shardingsphere.core.executor.ExecutorEngine;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
 * Connection strictly execute engine.
 *
 * @author panjuan
 */
public final class ConnectionStrictlyExecutorEngine extends ExecutorEngine {
    
    public ConnectionStrictlyExecutorEngine(final int executorSize) {
        super(executorSize);
    }
    
    @Override
    protected <T> List<T> getExecuteResults(final Collection<BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
        return getShardingExecuteEngine().groupExecute(getBaseStatementUnitGroups(baseStatementUnits), executeCallback);
    }
    
    private Map<String, Collection<BaseStatementUnit>> getBaseStatementUnitGroups(final Collection<? extends BaseStatementUnit> baseStatementUnits) {
        Map<String, Collection<BaseStatementUnit>> result = new LinkedHashMap<>(baseStatementUnits.size(), 1);
        for (BaseStatementUnit each : baseStatementUnits) {
            String dataSourceName = each.getSqlExecutionUnit().getDataSource();
            if (!result.keySet().contains(dataSourceName)) {
                result.put(dataSourceName, new LinkedList<BaseStatementUnit>());
            }
            result.get(dataSourceName).add(each);
        }
        return result;
    }
}
+0 −43
Original line number Diff line number Diff line
/*
 * Copyright 2016-2018 shardingsphere.io.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package io.shardingsphere.core.executor.type.memory;

import io.shardingsphere.core.executor.BaseStatementUnit;
import io.shardingsphere.core.executor.ExecuteCallback;
import io.shardingsphere.core.executor.ExecutorEngine;

import java.util.Collection;
import java.util.List;

/**
 * Memory strictly execute engine.
 *
 * @author panjuan
 */
public final class MemoryStrictlyExecutorEngine extends ExecutorEngine {
    
    public MemoryStrictlyExecutorEngine(final int executorSize) {
        super(executorSize);
    }
    
    @Override
    protected <T> List<T> getExecuteResults(final Collection<BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
        return getShardingExecuteEngine().execute(baseStatementUnits, executeCallback);
    }
}
+2 −4
Original line number Diff line number Diff line
@@ -25,8 +25,6 @@ import io.shardingsphere.core.constant.properties.ShardingProperties;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.executor.ExecutorEngine;
import io.shardingsphere.core.executor.type.connection.ConnectionStrictlyExecutorEngine;
import io.shardingsphere.core.executor.type.memory.MemoryStrictlyExecutorEngine;
import io.shardingsphere.core.jdbc.adapter.AbstractDataSourceAdapter;
import io.shardingsphere.core.jdbc.core.ShardingContext;
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
@@ -74,7 +72,7 @@ public class ShardingDataSource extends AbstractDataSourceAdapter implements Aut
        shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
        int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
        ConnectionMode connectionMode = ConnectionMode.valueOf(shardingProperties.<String>getValue(ShardingPropertiesConstant.CONNECTION_MODE));
        executorEngine = ConnectionMode.MEMORY_STRICTLY == connectionMode ? new MemoryStrictlyExecutorEngine(executorSize) : new ConnectionStrictlyExecutorEngine(executorSize);
        executorEngine = new ExecutorEngine(executorSize, connectionMode);
        ShardingMetaData shardingMetaData = new ShardingMetaData(getDataSourceURLs(dataSourceMap), shardingRule, getDatabaseType(), 
                executorEngine.getShardingExecuteEngine().getExecutorService(), new JDBCTableMetaDataConnectionManager(dataSourceMap));
        boolean showSQL = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
@@ -112,7 +110,7 @@ public class ShardingDataSource extends AbstractDataSourceAdapter implements Aut
        ConnectionMode newConnectionMode = ConnectionMode.valueOf(newShardingProperties.<String>getValue(ShardingPropertiesConstant.CONNECTION_MODE));
        if (originalExecutorSize != newExecutorSize || originalConnectionMode != newConnectionMode) {
            ExecutorEngine originalExecutorEngine = executorEngine;
            executorEngine = ConnectionMode.MEMORY_STRICTLY == newConnectionMode ? new MemoryStrictlyExecutorEngine(newExecutorSize) : new ConnectionStrictlyExecutorEngine(newExecutorSize);
            executorEngine = new ExecutorEngine(newExecutorSize, newConnectionMode);
            originalExecutorEngine.close();
        }
        shardingProperties = newShardingProperties;
+3 −3
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@

package io.shardingsphere.core.executor.type;

import io.shardingsphere.core.constant.ConnectionMode;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.ExecutorEngine;
import io.shardingsphere.core.executor.fixture.EventCaller;
import io.shardingsphere.core.executor.fixture.ExecutorTestUtil;
@@ -24,8 +26,6 @@ import io.shardingsphere.core.executor.fixture.TestDMLExecutionEventListener;
import io.shardingsphere.core.executor.fixture.TestDQLExecutionEventListener;
import io.shardingsphere.core.executor.fixture.TestOverallExecutionEventListener;
import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.executor.type.memory.MemoryStrictlyExecutorEngine;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import lombok.AccessLevel;
import lombok.Getter;
import org.junit.After;
@@ -51,7 +51,7 @@ public abstract class AbstractBaseExecutorTest {
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        ExecutorExceptionHandler.setExceptionThrown(false);
        executorEngine = new MemoryStrictlyExecutorEngine(Runtime.getRuntime().availableProcessors());
        executorEngine = new ExecutorEngine(Runtime.getRuntime().availableProcessors(), ConnectionMode.MEMORY_STRICTLY);
        overallExecutionEventListener = new TestOverallExecutionEventListener(eventCaller);
        dqlExecutionEventListener = new TestDQLExecutionEventListener(eventCaller);
        dmlExecutionEventListener = new TestDMLExecutionEventListener(eventCaller);
Loading