Commit 4f414c18 authored by terrymanu's avatar terrymanu
Browse files

throw SQLException on ShardingExecuteEngine

parent 81abe770
Loading
Loading
Loading
Loading
+4 −2
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@

package io.shardingsphere.core.executor;

import java.sql.SQLException;

/**
 * Sharding execute callback.
 * 
@@ -32,7 +34,7 @@ public interface ShardingExecuteCallback<I, O> {
     * 
     * @param input input value
     * @return execute result
     * @throws Exception throw when execute failure
     * @throws SQLException throw when execute failure
     */
    O execute(I input) throws Exception;
    O execute(I input) throws SQLException;
}
+33 −16
Original line number Diff line number Diff line
@@ -21,7 +21,9 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.shardingsphere.core.exception.ShardingException;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -61,9 +63,9 @@ public final class ShardingExecuteEngine implements AutoCloseable {
     * @param <I> type of input value
     * @param <O> type of return value
     * @return execute result
     * @throws Exception throw if execute failure
     * @throws SQLException throw if execute failure
     */
    public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) throws Exception {
    public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) throws SQLException {
        if (inputs.isEmpty()) {
            return Collections.emptyList();
        }
@@ -82,9 +84,9 @@ public final class ShardingExecuteEngine implements AutoCloseable {
     * @param <I> type of input value
     * @param <O> type of return value
     * @return execute result
     * @throws Exception throw if execute failure
     * @throws SQLException throw if execute failure
     */
    public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> firstCallback, final ShardingExecuteCallback<I, O> callback) throws Exception {
    public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> firstCallback, final ShardingExecuteCallback<I, O> callback) throws SQLException {
        if (inputs.isEmpty()) {
            return Collections.emptyList();
        }
@@ -100,7 +102,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
            result.add(executorService.submit(new Callable<O>() {
                
                @Override
                public O call() throws Exception {
                public O call() throws SQLException {
                    return callback.execute(each);
                }
            }));
@@ -108,15 +110,19 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        return result;
    }
    
    private <I, O> O syncExecute(final I input, final ShardingExecuteCallback<I, O> callback) throws Exception {
    private <I, O> O syncExecute(final I input, final ShardingExecuteCallback<I, O> callback) throws SQLException {
        return callback.execute(input);
    }
    
    private <O> List<O> getResults(final O firstResult, final Collection<ListenableFuture<O>> restFutures) throws ExecutionException, InterruptedException {
    private <O> List<O> getResults(final O firstResult, final Collection<ListenableFuture<O>> restFutures) throws SQLException {
        List<O> result = new LinkedList<>();
        result.add(firstResult);
        for (ListenableFuture<O> each : restFutures) {
            try {
                result.add(each.get());
            } catch (final InterruptedException | ExecutionException ex) {
                return throwException(ex);
            }
        }
        return result;
    }
@@ -129,9 +135,9 @@ public final class ShardingExecuteEngine implements AutoCloseable {
     * @param <I> type of input value
     * @param <O> type of return value
     * @return execute result
     * @throws Exception throw if execute failure
     * @throws SQLException throw if execute failure
     */
    public <I, O> List<O> groupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
    public <I, O> List<O> groupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        if (inputs.isEmpty()) {
            return Collections.emptyList();
        }
@@ -150,10 +156,10 @@ public final class ShardingExecuteEngine implements AutoCloseable {
     * @param <I> type of input value
     * @param <O> type of return value
     * @return execute result
     * @throws Exception throw if execute failure
     * @throws SQLException throw if execute failure
     */
    public <I, O> List<O> groupExecute(
            final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
            final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        if (inputs.isEmpty()) {
            return Collections.emptyList();
        }
@@ -169,7 +175,7 @@ public final class ShardingExecuteEngine implements AutoCloseable {
            result.add(executorService.submit(new Callable<Collection<O>>() {
                
                @Override
                public Collection<O> call() throws Exception {
                public Collection<O> call() throws SQLException {
                    return callback.execute(entry.getKey(), entry.getValue());
                }
            }));
@@ -177,19 +183,30 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        return result;
    }
    
    private <I, O> Collection<O> syncGroupExecute(final String dataSourceName, final Collection<I> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
    private <I, O> Collection<O> syncGroupExecute(final String dataSourceName, final Collection<I> inputs, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        return callback.execute(dataSourceName, inputs);
    }
    
    private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws ExecutionException, InterruptedException {
    private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
        List<O> result = new LinkedList<>();
        result.addAll(firstResults);
        for (ListenableFuture<Collection<O>> each : restFutures) {
            try {
                result.addAll(each.get());
            } catch (final InterruptedException | ExecutionException ex) {
                return throwException(ex);
            }
        }
        return result;
    }
    
    private <O> List<O> throwException(final Exception ex) throws SQLException {
        if (ex.getCause() instanceof SQLException) {
            throw (SQLException) ex.getCause();
        }
        throw new ShardingException(ex);
    }
    
    @Override
    public void close() {
        SHUTDOWN_EXECUTOR.execute(new Runnable() {
+3 −2
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@

package io.shardingsphere.core.executor;

import java.sql.SQLException;
import java.util.Collection;

/**
@@ -35,7 +36,7 @@ public interface ShardingGroupExecuteCallback<I, O> {
     * @param key input key
     * @param values input values
     * @return execute result
     * @throws Exception throw when execute failure
     * @throws SQLException throw when execute failure
     */
    Collection<O> execute(String key, Collection<I> values) throws Exception;
    Collection<O> execute(String key, Collection<I> values) throws SQLException;
}
+3 −3
Original line number Diff line number Diff line
@@ -52,12 +52,12 @@ public abstract class SQLExecuteCallback<T> implements ShardingExecuteCallback<S
    private final EventBus shardingEventBus = ShardingEventBusInstance.getInstance();
    
    @Override
    public final T execute(final StatementExecuteUnit executeUnit) throws Exception {
    public final T execute(final StatementExecuteUnit executeUnit) throws SQLException {
        return executeInternal(executeUnit);
    }
    
    @Override
    public final Collection<T> execute(final String dataSourceName, final Collection<StatementExecuteUnit> executeUnits) throws Exception {
    public final Collection<T> execute(final String dataSourceName, final Collection<StatementExecuteUnit> executeUnits) throws SQLException {
        Collection<T> result = new LinkedList<>();
        for (StatementExecuteUnit each : executeUnits) {
            result.add(executeInternal(each));
@@ -65,7 +65,7 @@ public abstract class SQLExecuteCallback<T> implements ShardingExecuteCallback<S
        return result;
    }
    
    private T executeInternal(final StatementExecuteUnit executeUnit) throws Exception {
    private T executeInternal(final StatementExecuteUnit executeUnit) throws SQLException {
        ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
        ExecutorDataMap.setDataMap(dataMap);
        List<SQLExecutionEvent> events = new LinkedList<>();
+12 −16
Original line number Diff line number Diff line
@@ -59,9 +59,8 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine {
    public ExecuteResponse execute(final SQLRouteResult routeResult, final boolean isReturnGeneratedKeys) throws SQLException {
        Map<String, Collection<SQLUnit>> sqlUnitGroups = routeResult.getSQLUnitGroups();
        Collection<ExecuteResponseUnit> executeResponseUnits;
        try {
        if (TransactionType.XA == RuleRegistry.getInstance().getTransactionType()) {
                final Map<String, Map<SQLUnit, Statement>> sqlUnitStatements = new HashMap<>(sqlUnitGroups.size(), 1);
            Map<String, Map<SQLUnit, Statement>> sqlUnitStatements = new HashMap<>(sqlUnitGroups.size(), 1);
            for (Entry<String, Collection<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
                sqlUnitStatements.put(entry.getKey(), createSQLUnitStatement(entry.getKey(), entry.getValue(), isReturnGeneratedKeys));
            }
@@ -71,9 +70,6 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine {
            executeResponseUnits = getShardingExecuteEngine().groupExecute(
                    sqlUnitGroups, new FirstTransactionGroupExecuteCallback(isReturnGeneratedKeys), new LocalTransactionGroupExecuteCallback(isReturnGeneratedKeys));
        }
        } catch (final Exception ex) {
            throw new SQLException(ex);
        }
        return getExecuteQueryResponse(executeResponseUnits);
    }
    
@@ -119,7 +115,7 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine {
        private final boolean isReturnGeneratedKeys;
        
        @Override
        public Collection<ExecuteResponseUnit> execute(final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws Exception {
        public Collection<ExecuteResponseUnit> execute(final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws SQLException {
            Collection<ExecuteResponseUnit> result = new LinkedList<>();
            boolean hasMetaData = false;
            Connection connection = getBackendConnection().getConnection(dataSourceName);
@@ -147,7 +143,7 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine {
        private final Map<String, Map<SQLUnit, Statement>> sqlUnitStatements;
        
        @Override
        public Collection<ExecuteResponseUnit> execute(final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws Exception {
        public Collection<ExecuteResponseUnit> execute(final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws SQLException {
            Collection<ExecuteResponseUnit> result = new LinkedList<>();
            for (Entry<SQLUnit, Statement> each : sqlUnitStatements.get(dataSourceName).entrySet()) {
                result.add(executeWithoutMetadata(each.getValue(), each.getKey().getSql(), isReturnGeneratedKeys));
@@ -162,7 +158,7 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine {
        private final boolean isReturnGeneratedKeys;
        
        @Override
        public Collection<ExecuteResponseUnit> execute(final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws Exception {
        public Collection<ExecuteResponseUnit> execute(final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws SQLException {
            Collection<ExecuteResponseUnit> result = new LinkedList<>();
            for (Entry<SQLUnit, Statement> each : createSQLUnitStatement(dataSourceName, sqlUnits, isReturnGeneratedKeys).entrySet()) {
                result.add(executeWithoutMetadata(each.getValue(), each.getKey().getSql(), isReturnGeneratedKeys));
Loading