Commit 71af5027 authored by terrymanu's avatar terrymanu
Browse files

add ShardingExecuteEngine

parent 3beaf92e
Loading
Loading
Loading
Loading
+12 −11
Original line number Diff line number Diff line
@@ -15,23 +15,24 @@
 * </p>
 */

package io.shardingsphere.opentracing.config;

import com.google.common.base.Optional;
package io.shardingsphere.core.executor;

/**
 * Parse config.
 * Sharding execute callback.
 * 
 * @author zhangliang
 * 
 * @author gaohongtao
 * @author wangkai
 * @param <I> type of input value
 * @param <O> type of output value
 */
interface ConfigurationParser {
public interface ShardingExecuteCallback<I, O> {
    
    /**
     * Parse config item to config value.
     * Execute callback.
     * 
     * @param configItem config item
     * @return config value
     * @param input input value
     * @return execute result
     * @throws Exception throw when execute failure
     */
    Optional<String> parse(String configItem);
    O execute(I input) throws Exception;
}
+116 −0
Original line number Diff line number Diff line
/*
 * Copyright 2016-2018 shardingsphere.io.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package io.shardingsphere.core.executor;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Sharding execute engine.
 * 
 * @author zhangliang
 */
public final class ShardingExecuteEngine implements AutoCloseable {
    
    private static final ExecutorService SHUTDOWN_EXECUTOR = Executors.newSingleThreadExecutor(ShardingThreadFactoryBuilder.build("Executor-Engine-Closer"));
    
    @Getter
    private final ListeningExecutorService executorService;
    
    public ShardingExecuteEngine(final int executorSize) {
        executorService = MoreExecutors.listeningDecorator(
                0 == executorSize ? Executors.newCachedThreadPool(ShardingThreadFactoryBuilder.build()) : Executors.newFixedThreadPool(executorSize, ShardingThreadFactoryBuilder.build()));
        MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
    }
    
    /**
     * Execute all callbacks.
     *
     * @param inputs sharding execute callbacks
     * @param callback sharding execute callback
     * @param <I> type of input value
     * @param <O> type of return value
     * @return execute result
     * @throws Exception throw if execute failure
     */
    public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) throws Exception {
        if (inputs.isEmpty()) {
            return Collections.emptyList();
        }
        Iterator<I> inputIterator = inputs.iterator();
        I firstInput = inputIterator.next();
        Collection<ListenableFuture<O>> restFutures = asyncExecute(Lists.newArrayList(inputIterator), callback);
        return getResults(callback.execute(firstInput), restFutures);
    }
    
    private <I, O> Collection<ListenableFuture<O>> asyncExecute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) {
        Collection<ListenableFuture<O>> result = new ArrayList<>(inputs.size());
        for (final I each : inputs) {
            result.add(executorService.submit(new Callable<O>() {
                
                @Override
                public O call() throws Exception {
                    return callback.execute(each);
                }
            }));
        }
        return result;
    }
    
    private <O> List<O> getResults(final O firstResult, final Collection<ListenableFuture<O>> restFutures) throws ExecutionException, InterruptedException {
        List<O> result = new LinkedList<>();
        result.add(firstResult);
        for (ListenableFuture<O> each : restFutures) {
            result.add(each.get());
        }
        return result;
    }
    
    @Override
    public void close() {
        SHUTDOWN_EXECUTOR.execute(new Runnable() {
            
            @Override
            public void run() {
                try {
                    executorService.shutdown();
                    while (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
                        executorService.shutdownNow();
                    }
                } catch (final InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        });
    }
}
+56 −0
Original line number Diff line number Diff line
/*
 * Copyright 2016-2018 shardingsphere.io.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package io.shardingsphere.core.executor;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import java.util.concurrent.ThreadFactory;

/**
 * Sharding thread factory builder.
 *
 * @author zhangliang
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShardingThreadFactoryBuilder {
    
    private static final String NAME_FORMAT_PREFIX = "Sharding-Sphere-";
    
    private static final String DEFAULT_EXECUTOR_NAME_FORMAT = NAME_FORMAT_PREFIX + "%d";
    
    /**
     * Build default sharding thread factory.
     *
     * @return default sharding thread factory
     */
    public static ThreadFactory build() {
        return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(DEFAULT_EXECUTOR_NAME_FORMAT).build();
    }
    
    /**
     * Build sharding thread factory.
     * 
     * @param nameFormat thread name format
     * @return sharding thread factory
     */
    public static ThreadFactory build(final String nameFormat) {
        return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(NAME_FORMAT_PREFIX + nameFormat).build();
    }
}
+6 −6
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@

package io.shardingsphere.core.executor;

import io.shardingsphere.core.constant.SQLType;

/**
 * Statement execute callback interface.
 *
@@ -25,14 +27,12 @@ package io.shardingsphere.core.executor;
 * 
 * @param <T> class type of return value
 */
public interface ExecuteCallback<T> {
public interface ExecuteCallback<T> extends ShardingExecuteCallback<BaseStatementUnit, T> {
    
    /**
     * Execute task.
     * Get SQL type.
     * 
     * @param baseStatementUnit statement execute unit
     * @return execute result
     * @throws Exception execute exception
     * @return SQL type
     */
    T execute(BaseStatementUnit baseStatementUnit) throws Exception;
    SQLType getSQLType();
}
+7 −10
Original line number Diff line number Diff line
@@ -20,7 +20,6 @@ package io.shardingsphere.core.executor;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.shardingsphere.core.constant.SQLType;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.event.overall.OverallExecutionEvent;
import io.shardingsphere.core.executor.event.sql.SQLExecutionEvent;
@@ -68,22 +67,20 @@ public abstract class ExecutorEngine implements AutoCloseable {
    /**
     * Execute.
     *
     * @param sqlType SQL type
     * @param baseStatementUnits statement execute unitS
     * @param executeCallback prepared statement execute callback
     * @param <T> class type of return value
     * @return execute result
     * @throws SQLException SQL exception
     */
    public <T> List<T> execute(
            final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws SQLException {
    public <T> List<T> execute(final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws SQLException {
        if (baseStatementUnits.isEmpty()) {
            return Collections.emptyList();
        }
        OverallExecutionEvent event = new OverallExecutionEvent(sqlType, baseStatementUnits.size() > 1);
        OverallExecutionEvent event = new OverallExecutionEvent(executeCallback.getSQLType(), baseStatementUnits.size() > 1);
        shardingEventBus.post(event);
        try {
            List<T> result = getExecuteResults(sqlType, baseStatementUnits, executeCallback);
            List<T> result = getExecuteResults(baseStatementUnits, executeCallback);
            event.setExecuteSuccess();
            return result;
            // CHECKSTYLE:OFF
@@ -97,16 +94,16 @@ public abstract class ExecutorEngine implements AutoCloseable {
        }
    }
    
    protected abstract <T> List<T> getExecuteResults(SQLType sqlType, Collection<? extends BaseStatementUnit> baseStatementUnits, ExecuteCallback<T> executeCallback) throws Exception;
    protected abstract <T> List<T> getExecuteResults(Collection<? extends BaseStatementUnit> baseStatementUnits, ExecuteCallback<T> executeCallback) throws Exception;
    
    protected final <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, 
                                          final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
    protected final <T> T executeInternal(
            final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
        T result;
        ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
        ExecutorDataMap.setDataMap(dataMap);
        List<SQLExecutionEvent> events = new LinkedList<>();
        for (List<Object> each : baseStatementUnit.getSqlExecutionUnit().getSqlUnit().getParameterSets()) {
            SQLExecutionEvent event = SQLExecutionEventFactory.createEvent(sqlType, baseStatementUnit, each);
            SQLExecutionEvent event = SQLExecutionEventFactory.createEvent(executeCallback.getSQLType(), baseStatementUnit, each);
            events.add(event);
            shardingEventBus.post(event);
        }
Loading