Loading sharding-jdbc/src/main/java/io/shardingsphere/core/executor/ExecuteCallback.java +16 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ package io.shardingsphere.core.executor; import io.shardingsphere.core.constant.SQLType; import java.util.Map; /** * Statement execute callback interface. * Loading @@ -35,4 +37,18 @@ public interface ExecuteCallback<T> extends ShardingExecuteCallback<BaseStatemen * @return SQL type */ SQLType getSQLType(); /** * Judge is exception thrown or not. * * @return is exception thrown or not */ boolean isExceptionThrown(); /** * Get data map. * * @return data map */ Map<String, Object> getDataMap(); } sharding-jdbc/src/main/java/io/shardingsphere/core/executor/ExecutorEngine.java +3 −4 Original line number Diff line number Diff line Loading @@ -34,7 +34,6 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; Loading Loading @@ -97,10 +96,10 @@ public abstract class ExecutorEngine implements AutoCloseable { protected abstract <T> List<T> getExecuteResults(Collection<? extends BaseStatementUnit> baseStatementUnits, ExecuteCallback<T> executeCallback) throws Exception; protected final <T> T executeInternal( final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception { final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback) throws Exception { T result; ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); ExecutorDataMap.setDataMap(dataMap); ExecutorExceptionHandler.setExceptionThrown(executeCallback.isExceptionThrown()); ExecutorDataMap.setDataMap(executeCallback.getDataMap()); List<SQLExecutionEvent> events = new LinkedList<>(); for (List<Object> each : baseStatementUnit.getSqlExecutionUnit().getSqlUnit().getParameterSets()) { SQLExecutionEvent event = SQLExecutionEventFactory.createEvent(executeCallback.getSQLType(), baseStatementUnit, each); Loading sharding-jdbc/src/main/java/io/shardingsphere/core/executor/type/batch/BatchPreparedStatementExecutor.java +14 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,8 @@ import io.shardingsphere.core.constant.SQLType; import io.shardingsphere.core.executor.BaseStatementUnit; import io.shardingsphere.core.executor.ExecuteCallback; import io.shardingsphere.core.executor.ExecutorEngine; import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap; import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler; import lombok.RequiredArgsConstructor; import java.sql.SQLException; Loading Loading @@ -55,6 +57,8 @@ public final class BatchPreparedStatementExecutor { * @throws SQLException SQL exception */ public int[] executeBatch() throws SQLException { final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); final Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); return accumulate(executorEngine.execute(batchPreparedStatementUnits, new ExecuteCallback<int[]>() { @Override Loading @@ -66,6 +70,16 @@ public final class BatchPreparedStatementExecutor { public SQLType getSQLType() { return sqlType; } @Override public boolean isExceptionThrown() { return isExceptionThrown; } @Override public Map<String, Object> getDataMap() { return dataMap; } })); } Loading sharding-jdbc/src/main/java/io/shardingsphere/core/executor/type/connection/ConnectionStrictlyExecutorEngine.java +2 −6 Original line number Diff line number Diff line Loading @@ -21,8 +21,6 @@ import com.google.common.util.concurrent.ListenableFuture; import io.shardingsphere.core.executor.BaseStatementUnit; import io.shardingsphere.core.executor.ExecuteCallback; import io.shardingsphere.core.executor.ExecutorEngine; import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap; import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler; import java.util.ArrayList; import java.util.Collection; Loading Loading @@ -66,8 +64,6 @@ public final class ConnectionStrictlyExecutorEngine extends ExecutorEngine { private <T> Collection<ListenableFuture<Collection<T>>> asyncExecute(final Map<String, Collection<BaseStatementUnit>> baseStatementUnitGroups, final ExecuteCallback<T> executeCallback) { Collection<ListenableFuture<Collection<T>>> result = new ArrayList<>(baseStatementUnitGroups.size()); final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); final Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); for (Map.Entry<String, Collection<BaseStatementUnit>> entry : baseStatementUnitGroups.entrySet()) { final Collection<BaseStatementUnit> baseStatementUnits = entry.getValue(); result.add(getExecutorService().submit(new Callable<Collection<T>>() { Loading @@ -75,7 +71,7 @@ public final class ConnectionStrictlyExecutorEngine extends ExecutorEngine { public Collection<T> call() throws Exception { Collection<T> result = new LinkedList<>(); for (BaseStatementUnit each : baseStatementUnits) { result.add(executeInternal(each, executeCallback, isExceptionThrown, dataMap)); result.add(executeInternal(each, executeCallback)); } return result; } Loading @@ -87,7 +83,7 @@ public final class ConnectionStrictlyExecutorEngine extends ExecutorEngine { private <T> Collection<T> syncExecute(final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception { Collection<T> result = new LinkedList<>(); for (BaseStatementUnit each : baseStatementUnits) { result.add(executeInternal(each, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap())); result.add(executeInternal(each, executeCallback)); } return result; } Loading sharding-jdbc/src/main/java/io/shardingsphere/core/executor/type/memory/MemoryStrictlyExecutorEngine.java +2 −7 Original line number Diff line number Diff line Loading @@ -22,15 +22,12 @@ import com.google.common.util.concurrent.ListenableFuture; import io.shardingsphere.core.executor.BaseStatementUnit; import io.shardingsphere.core.executor.ExecuteCallback; import io.shardingsphere.core.executor.ExecutorEngine; import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap; import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; Loading @@ -55,14 +52,12 @@ public final class MemoryStrictlyExecutorEngine extends ExecutorEngine { private <T> Collection<ListenableFuture<T>> asyncExecute(final Collection<BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) { List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size()); final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); final Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); for (final BaseStatementUnit each : baseStatementUnits) { result.add(getExecutorService().submit(new Callable<T>() { @Override public T call() throws Exception { return executeInternal(each, executeCallback, isExceptionThrown, dataMap); return executeInternal(each, executeCallback); } })); } Loading @@ -70,7 +65,7 @@ public final class MemoryStrictlyExecutorEngine extends ExecutorEngine { } private <T> T syncExecute(final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback) throws Exception { return executeInternal(baseStatementUnit, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap()); return executeInternal(baseStatementUnit, executeCallback); } private <T> List<T> getResultList(final T firstOutput, final Collection<ListenableFuture<T>> restResultFutures) throws ExecutionException, InterruptedException { Loading Loading
sharding-jdbc/src/main/java/io/shardingsphere/core/executor/ExecuteCallback.java +16 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ package io.shardingsphere.core.executor; import io.shardingsphere.core.constant.SQLType; import java.util.Map; /** * Statement execute callback interface. * Loading @@ -35,4 +37,18 @@ public interface ExecuteCallback<T> extends ShardingExecuteCallback<BaseStatemen * @return SQL type */ SQLType getSQLType(); /** * Judge is exception thrown or not. * * @return is exception thrown or not */ boolean isExceptionThrown(); /** * Get data map. * * @return data map */ Map<String, Object> getDataMap(); }
sharding-jdbc/src/main/java/io/shardingsphere/core/executor/ExecutorEngine.java +3 −4 Original line number Diff line number Diff line Loading @@ -34,7 +34,6 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; Loading Loading @@ -97,10 +96,10 @@ public abstract class ExecutorEngine implements AutoCloseable { protected abstract <T> List<T> getExecuteResults(Collection<? extends BaseStatementUnit> baseStatementUnits, ExecuteCallback<T> executeCallback) throws Exception; protected final <T> T executeInternal( final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception { final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback) throws Exception { T result; ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); ExecutorDataMap.setDataMap(dataMap); ExecutorExceptionHandler.setExceptionThrown(executeCallback.isExceptionThrown()); ExecutorDataMap.setDataMap(executeCallback.getDataMap()); List<SQLExecutionEvent> events = new LinkedList<>(); for (List<Object> each : baseStatementUnit.getSqlExecutionUnit().getSqlUnit().getParameterSets()) { SQLExecutionEvent event = SQLExecutionEventFactory.createEvent(executeCallback.getSQLType(), baseStatementUnit, each); Loading
sharding-jdbc/src/main/java/io/shardingsphere/core/executor/type/batch/BatchPreparedStatementExecutor.java +14 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,8 @@ import io.shardingsphere.core.constant.SQLType; import io.shardingsphere.core.executor.BaseStatementUnit; import io.shardingsphere.core.executor.ExecuteCallback; import io.shardingsphere.core.executor.ExecutorEngine; import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap; import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler; import lombok.RequiredArgsConstructor; import java.sql.SQLException; Loading Loading @@ -55,6 +57,8 @@ public final class BatchPreparedStatementExecutor { * @throws SQLException SQL exception */ public int[] executeBatch() throws SQLException { final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); final Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); return accumulate(executorEngine.execute(batchPreparedStatementUnits, new ExecuteCallback<int[]>() { @Override Loading @@ -66,6 +70,16 @@ public final class BatchPreparedStatementExecutor { public SQLType getSQLType() { return sqlType; } @Override public boolean isExceptionThrown() { return isExceptionThrown; } @Override public Map<String, Object> getDataMap() { return dataMap; } })); } Loading
sharding-jdbc/src/main/java/io/shardingsphere/core/executor/type/connection/ConnectionStrictlyExecutorEngine.java +2 −6 Original line number Diff line number Diff line Loading @@ -21,8 +21,6 @@ import com.google.common.util.concurrent.ListenableFuture; import io.shardingsphere.core.executor.BaseStatementUnit; import io.shardingsphere.core.executor.ExecuteCallback; import io.shardingsphere.core.executor.ExecutorEngine; import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap; import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler; import java.util.ArrayList; import java.util.Collection; Loading Loading @@ -66,8 +64,6 @@ public final class ConnectionStrictlyExecutorEngine extends ExecutorEngine { private <T> Collection<ListenableFuture<Collection<T>>> asyncExecute(final Map<String, Collection<BaseStatementUnit>> baseStatementUnitGroups, final ExecuteCallback<T> executeCallback) { Collection<ListenableFuture<Collection<T>>> result = new ArrayList<>(baseStatementUnitGroups.size()); final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); final Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); for (Map.Entry<String, Collection<BaseStatementUnit>> entry : baseStatementUnitGroups.entrySet()) { final Collection<BaseStatementUnit> baseStatementUnits = entry.getValue(); result.add(getExecutorService().submit(new Callable<Collection<T>>() { Loading @@ -75,7 +71,7 @@ public final class ConnectionStrictlyExecutorEngine extends ExecutorEngine { public Collection<T> call() throws Exception { Collection<T> result = new LinkedList<>(); for (BaseStatementUnit each : baseStatementUnits) { result.add(executeInternal(each, executeCallback, isExceptionThrown, dataMap)); result.add(executeInternal(each, executeCallback)); } return result; } Loading @@ -87,7 +83,7 @@ public final class ConnectionStrictlyExecutorEngine extends ExecutorEngine { private <T> Collection<T> syncExecute(final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception { Collection<T> result = new LinkedList<>(); for (BaseStatementUnit each : baseStatementUnits) { result.add(executeInternal(each, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap())); result.add(executeInternal(each, executeCallback)); } return result; } Loading
sharding-jdbc/src/main/java/io/shardingsphere/core/executor/type/memory/MemoryStrictlyExecutorEngine.java +2 −7 Original line number Diff line number Diff line Loading @@ -22,15 +22,12 @@ import com.google.common.util.concurrent.ListenableFuture; import io.shardingsphere.core.executor.BaseStatementUnit; import io.shardingsphere.core.executor.ExecuteCallback; import io.shardingsphere.core.executor.ExecutorEngine; import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap; import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; Loading @@ -55,14 +52,12 @@ public final class MemoryStrictlyExecutorEngine extends ExecutorEngine { private <T> Collection<ListenableFuture<T>> asyncExecute(final Collection<BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) { List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size()); final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); final Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); for (final BaseStatementUnit each : baseStatementUnits) { result.add(getExecutorService().submit(new Callable<T>() { @Override public T call() throws Exception { return executeInternal(each, executeCallback, isExceptionThrown, dataMap); return executeInternal(each, executeCallback); } })); } Loading @@ -70,7 +65,7 @@ public final class MemoryStrictlyExecutorEngine extends ExecutorEngine { } private <T> T syncExecute(final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback) throws Exception { return executeInternal(baseStatementUnit, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap()); return executeInternal(baseStatementUnit, executeCallback); } private <T> List<T> getResultList(final T firstOutput, final Collection<ListenableFuture<T>> restResultFutures) throws ExecutionException, InterruptedException { Loading