Commit f0a4c32e authored by terrymanu's avatar terrymanu
Browse files

add JDBCExecuteCallback

parent 6079ee6c
Loading
Loading
Loading
Loading
+48 −19
Original line number Diff line number Diff line
@@ -17,8 +17,19 @@

package io.shardingsphere.core.executor;

import com.google.common.eventbus.EventBus;
import io.shardingsphere.core.constant.SQLType;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.event.sql.SQLExecutionEvent;
import io.shardingsphere.core.executor.event.sql.SQLExecutionEventFactory;
import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
@@ -29,26 +40,44 @@ import java.util.Map;
 * 
 * @param <T> class type of return value
 */
public interface ExecuteCallback<T> extends ShardingExecuteCallback<BaseStatementUnit, T> {
@RequiredArgsConstructor
public final class ExecuteCallback<T> implements ShardingExecuteCallback<BaseStatementUnit, T> {
    
    /**
     * Get SQL type.
     * 
     * @return SQL type
     */
    SQLType getSQLType();
    @Getter
    private final SQLType sqlType;
    
    /**
     * Judge is exception thrown or not.
     * 
     * @return is exception thrown or not
     */
    boolean isExceptionThrown();
    private final boolean isExceptionThrown;
    
    /**
     * Get data map.
     * 
     * @return data map
     */
    Map<String, Object> getDataMap();
    private final Map<String, Object> dataMap;
    
    private final JDBCExecuteCallback<T> jdbcCallback;
    
    private final EventBus shardingEventBus = ShardingEventBusInstance.getInstance();
    
    @Override
    public T execute(final BaseStatementUnit input) throws Exception {
        ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
        ExecutorDataMap.setDataMap(dataMap);
        List<SQLExecutionEvent> events = new LinkedList<>();
        for (List<Object> each : input.getSqlExecutionUnit().getSqlUnit().getParameterSets()) {
            SQLExecutionEvent event = SQLExecutionEventFactory.createEvent(sqlType, input, each);
            events.add(event);
            shardingEventBus.post(event);
        }
        try {
            T result = jdbcCallback.execute(input);
            for (SQLExecutionEvent each : events) {
                each.setExecuteSuccess();
                shardingEventBus.post(each);
            }
            return result;
        } catch (final SQLException ex) {
            for (SQLExecutionEvent each : events) {
                each.setExecuteFailure(ex);
                shardingEventBus.post(each);
                ExecutorExceptionHandler.handleException(ex);
            }
            return null;
        }
    }
}
+1 −33
Original line number Diff line number Diff line
@@ -22,9 +22,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.event.overall.OverallExecutionEvent;
import io.shardingsphere.core.executor.event.sql.SQLExecutionEvent;
import io.shardingsphere.core.executor.event.sql.SQLExecutionEventFactory;
import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -32,7 +29,6 @@ import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -76,7 +72,7 @@ public abstract class ExecutorEngine implements AutoCloseable {
        if (baseStatementUnits.isEmpty()) {
            return Collections.emptyList();
        }
        OverallExecutionEvent event = new OverallExecutionEvent(executeCallback.getSQLType(), baseStatementUnits.size() > 1);
        OverallExecutionEvent event = new OverallExecutionEvent(executeCallback.getSqlType(), baseStatementUnits.size() > 1);
        shardingEventBus.post(event);
        try {
            List<T> result = getExecuteResults(baseStatementUnits, executeCallback);
@@ -95,34 +91,6 @@ public abstract class ExecutorEngine implements AutoCloseable {
    
    protected abstract <T> List<T> getExecuteResults(Collection<? extends BaseStatementUnit> baseStatementUnits, ExecuteCallback<T> executeCallback) throws Exception;
    
    protected final <T> T executeInternal(
            final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback) throws Exception {
        T result;
        ExecutorExceptionHandler.setExceptionThrown(executeCallback.isExceptionThrown());
        ExecutorDataMap.setDataMap(executeCallback.getDataMap());
        List<SQLExecutionEvent> events = new LinkedList<>();
        for (List<Object> each : baseStatementUnit.getSqlExecutionUnit().getSqlUnit().getParameterSets()) {
            SQLExecutionEvent event = SQLExecutionEventFactory.createEvent(executeCallback.getSQLType(), baseStatementUnit, each);
            events.add(event);
            shardingEventBus.post(event);
        }
        try {
            result = executeCallback.execute(baseStatementUnit);
            for (SQLExecutionEvent each : events) {
                each.setExecuteSuccess();
                shardingEventBus.post(each);
            }
            return result;
        } catch (final SQLException ex) {
            for (SQLExecutionEvent each : events) {
                each.setExecuteFailure(ex);
                shardingEventBus.post(each);
                ExecutorExceptionHandler.handleException(ex);
            }
            return null;
        }
    }
    
    @Override
    public final void close() {
        SHUTDOWN_EXECUTOR.execute(new Runnable() {
+39 −0
Original line number Diff line number Diff line
/*
 * Copyright 2016-2018 shardingsphere.io.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package io.shardingsphere.core.executor;

import java.sql.SQLException;

/**
 * JDBC execute callback interface.
 *
 * @author zhangliang
 * 
 * @param <T> class type of return value
 */
public interface JDBCExecuteCallback<T> {
    
    /**
     * execute JDBC.
     * 
     * @param baseStatementUnit base statement unit
     * @return execute result
     * @throws SQLException SQL exception
     */
    T execute(BaseStatementUnit baseStatementUnit) throws SQLException;
}
+6 −19
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ import io.shardingsphere.core.constant.SQLType;
import io.shardingsphere.core.executor.BaseStatementUnit;
import io.shardingsphere.core.executor.ExecuteCallback;
import io.shardingsphere.core.executor.ExecutorEngine;
import io.shardingsphere.core.executor.JDBCExecuteCallback;
import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler;
import lombok.RequiredArgsConstructor;
@@ -59,28 +60,14 @@ public final class BatchPreparedStatementExecutor {
    public int[] executeBatch() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
        return accumulate(executorEngine.execute(batchPreparedStatementUnits, new ExecuteCallback<int[]>() {
        ExecuteCallback<int[]> executeCallback = new ExecuteCallback<>(sqlType, isExceptionThrown, dataMap, new JDBCExecuteCallback<int[]>() {
        
            @Override
            public int[] execute(final BaseStatementUnit baseStatementUnit) throws Exception {
            public int[] execute(final BaseStatementUnit baseStatementUnit) throws SQLException {
                return baseStatementUnit.getStatement().executeBatch();
            }
            
            @Override
            public SQLType getSQLType() {
                return sqlType;
            }
            
            @Override
            public boolean isExceptionThrown() {
                return isExceptionThrown;
            }
            
            @Override
            public Map<String, Object> getDataMap() {
                return dataMap;
            }
        }));
        });
        return accumulate(executorEngine.execute(batchPreparedStatementUnits, executeCallback));
    }
    
    private int[] accumulate(final List<int[]> results) {
+2 −2
Original line number Diff line number Diff line
@@ -71,7 +71,7 @@ public final class ConnectionStrictlyExecutorEngine extends ExecutorEngine {
                public Collection<T> call() throws Exception {
                    Collection<T> result = new LinkedList<>();
                    for (BaseStatementUnit each : baseStatementUnits) {
                        result.add(executeInternal(each, executeCallback));
                        result.add(executeCallback.execute(each));
                    }
                    return result;
                }
@@ -83,7 +83,7 @@ public final class ConnectionStrictlyExecutorEngine extends ExecutorEngine {
    private <T> Collection<T> syncExecute(final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws Exception {
        Collection<T> result = new LinkedList<>();
        for (BaseStatementUnit each : baseStatementUnits) {
            result.add(executeInternal(each, executeCallback));
            result.add(executeCallback.execute(each));
        }
        return result;
    }
Loading