Unverified Commit 06036642 authored by chenqy's avatar chenqy Committed by GitHub
Browse files

Merge pull request #21 from sharding-sphere/dev

update from origin
parents 0705dc0d 3beaf92e
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -19,7 +19,7 @@ package io.shardingsphere.transaction.bed.sync;

import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import io.shardingsphere.core.executor.event.DMLExecutionEvent;
import io.shardingsphere.core.executor.event.sql.DMLExecutionEvent;
import io.shardingsphere.transaction.api.SoftTransactionManager;
import io.shardingsphere.transaction.api.config.SoftTransactionConfiguration;
import io.shardingsphere.transaction.bed.BEDSoftTransaction;
+23 −31
Original line number Diff line number Diff line
@@ -17,16 +17,16 @@

package io.shardingsphere.core.executor;

import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.shardingsphere.core.constant.SQLType;
import io.shardingsphere.core.executor.event.ExecutionEvent;
import io.shardingsphere.core.executor.event.DMLExecutionEvent;
import io.shardingsphere.core.executor.event.DQLExecutionEvent;
import io.shardingsphere.core.executor.event.OverallExecutionEvent;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.event.overall.OverallExecutionEvent;
import io.shardingsphere.core.executor.event.sql.SQLExecutionEvent;
import io.shardingsphere.core.executor.event.sql.SQLExecutionEventFactory;
import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@@ -56,10 +56,13 @@ public abstract class ExecutorEngine implements AutoCloseable {
    @Getter
    private final ListeningExecutorService executorService;
    
    private final EventBus shardingEventBus;
    
    public ExecutorEngine(final int executorSize) {
        executorService = MoreExecutors.listeningDecorator(
                0 == executorSize ? Executors.newCachedThreadPool(ShardingThreadFactoryBuilder.build()) : Executors.newFixedThreadPool(executorSize, ShardingThreadFactoryBuilder.build()));
        MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
        shardingEventBus = ShardingEventBusInstance.getInstance();
    }
    
    /**
@@ -78,19 +81,19 @@ public abstract class ExecutorEngine implements AutoCloseable {
            return Collections.emptyList();
        }
        OverallExecutionEvent event = new OverallExecutionEvent(sqlType, baseStatementUnits.size() > 1);
        ShardingEventBusInstance.getInstance().post(event);
        shardingEventBus.post(event);
        try {
            List<T> result = getExecuteResults(sqlType, baseStatementUnits, executeCallback);
            event.setExecuteSuccess();
            ShardingEventBusInstance.getInstance().post(event);
            return result;
            // CHECKSTYLE:OFF
        } catch (final Exception ex) {
            // CHECKSTYLE:ON
            event.setExecuteFailure(ex);
            ShardingEventBusInstance.getInstance().post(event);
            ExecutorExceptionHandler.handleException(ex);
            return null;
            return Collections.emptyList();
        } finally {
            shardingEventBus.post(event);
        }
    }
    
@@ -101,38 +104,27 @@ public abstract class ExecutorEngine implements AutoCloseable {
        T result;
        ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
        ExecutorDataMap.setDataMap(dataMap);
        List<ExecutionEvent> events = new LinkedList<>();
        List<SQLExecutionEvent> events = new LinkedList<>();
        for (List<Object> each : baseStatementUnit.getSqlExecutionUnit().getSqlUnit().getParameterSets()) {
            events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
        }
        for (ExecutionEvent event : events) {
            ShardingEventBusInstance.getInstance().post(event);
            SQLExecutionEvent event = SQLExecutionEventFactory.createEvent(sqlType, baseStatementUnit, each);
            events.add(event);
            shardingEventBus.post(event);
        }
        try {
            result = executeCallback.execute(baseStatementUnit);
            for (SQLExecutionEvent each : events) {
                each.setExecuteSuccess();
                shardingEventBus.post(each);
            }
            return result;
        } catch (final SQLException ex) {
            for (ExecutionEvent each : events) {
            for (SQLExecutionEvent each : events) {
                each.setExecuteFailure(ex);
                ShardingEventBusInstance.getInstance().post(each);
                shardingEventBus.post(each);
                ExecutorExceptionHandler.handleException(ex);
            }
            return null;
        }
        for (ExecutionEvent each : events) {
            each.setExecuteSuccess();
            ShardingEventBusInstance.getInstance().post(each);
        }
        return result;
    }
    
    private ExecutionEvent getExecutionEvent(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<Object> parameters) {
        ExecutionEvent result;
        if (SQLType.DQL == sqlType) {
            result = new DQLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.getSqlExecutionUnit().getSqlUnit(), parameters);
        } else {
            result = new DMLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.getSqlExecutionUnit().getSqlUnit(), parameters);
        }
        return result;
    }
    
    @Override
+3 −2
Original line number Diff line number Diff line
@@ -15,9 +15,10 @@
 * </p>
 */

package io.shardingsphere.core.executor.event;
package io.shardingsphere.core.executor.event.overall;

import io.shardingsphere.core.constant.SQLType;
import io.shardingsphere.core.event.ShardingEvent;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@@ -28,7 +29,7 @@ import lombok.RequiredArgsConstructor;
 */
@RequiredArgsConstructor
@Getter
public final class OverallExecutionEvent extends ExecutionEvent {
public final class OverallExecutionEvent extends ShardingEvent {
    
    private final SQLType sqlType;
    
+1 −1
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@
 * </p>
 */

package io.shardingsphere.core.executor.event;
package io.shardingsphere.core.executor.event.sql;

import io.shardingsphere.core.routing.SQLUnit;

+1 −1
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@
 * </p>
 */

package io.shardingsphere.core.executor.event;
package io.shardingsphere.core.executor.event.sql;

import io.shardingsphere.core.routing.SQLUnit;

Loading