Commit ef8fb160 authored by terrymanu's avatar terrymanu
Browse files

use ShardingExecuteEngine on Proxy execute

parent 1ff33de5
Loading
Loading
Loading
Loading
+43 −2
Original line number Diff line number Diff line
@@ -21,7 +21,6 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;

import java.util.ArrayList;
import java.util.Collection;
@@ -46,7 +45,6 @@ public final class ShardingExecuteEngine implements AutoCloseable {
    
    private static final ExecutorService SHUTDOWN_EXECUTOR = Executors.newSingleThreadExecutor(ShardingThreadFactoryBuilder.build("Executor-Engine-Closer"));
    
    @Getter
    private final ListeningExecutorService executorService;
    
    public ShardingExecuteEngine(final int executorSize) {
@@ -75,6 +73,27 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        return getResults(callback.execute(firstInput), restFutures);
    }
    
    /**
     * Execute all callbacks.
     *
     * @param inputs input values
     * @param firstCallback first sharding execute callback
     * @param callback sharding execute callback
     * @param <I> type of input value
     * @param <O> type of return value
     * @return execute result
     * @throws Exception throw if execute failure
     */
    public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> firstCallback, final ShardingExecuteCallback<I, O> callback) throws Exception {
        if (inputs.isEmpty()) {
            return Collections.emptyList();
        }
        Iterator<I> inputIterator = inputs.iterator();
        I firstInput = inputIterator.next();
        Collection<ListenableFuture<O>> restFutures = asyncExecute(Lists.newArrayList(inputIterator), callback);
        return getResults(firstCallback.execute(firstInput), restFutures);
    }
    
    private <I, O> Collection<ListenableFuture<O>> asyncExecute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) {
        Collection<ListenableFuture<O>> result = new ArrayList<>(inputs.size());
        for (final I each : inputs) {
@@ -118,6 +137,28 @@ public final class ShardingExecuteEngine implements AutoCloseable {
        return getGroupResults(callback.execute(firstKey, firstInputs), restResultFutures);
    }
    
    /**
     * execute all callbacks for group.
     *
     * @param inputs input value's map
     * @param callback sharding execute callback
     * @param firstCallback first sharding execute callback
     * @param <I> type of input value
     * @param <O> type of return value
     * @return execute result
     * @throws Exception throw if execute failure
     */
    public <I, O> List<O> groupExecute(
            final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws Exception {
        if (inputs.isEmpty()) {
            return Collections.emptyList();
        }
        String firstKey = inputs.keySet().iterator().next();
        Collection<I> firstInputs = inputs.remove(firstKey);
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(inputs, callback);
        return getGroupResults(firstCallback.execute(firstKey, firstInputs), restResultFutures);
    }
    
    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final Map<String, Collection<I>> inputs, final ShardingGroupExecuteCallback<I, O> callback) {
        Collection<ListenableFuture<Collection<O>>> result = new ArrayList<>(inputs.size());
        for (final Entry<String, Collection<I>> entry : inputs.entrySet()) {
+5 −5
Original line number Diff line number Diff line
@@ -17,7 +17,9 @@

package io.shardingsphere.proxy.backend.jdbc.execute;

import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.merger.QueryResult;
import io.shardingsphere.proxy.backend.BackendExecutorContext;
import io.shardingsphere.proxy.backend.SQLExecuteEngine;
import io.shardingsphere.proxy.backend.jdbc.connection.BackendConnection;
import io.shardingsphere.proxy.backend.jdbc.execute.response.unit.ExecuteQueryResponseUnit;
@@ -25,12 +27,11 @@ import io.shardingsphere.proxy.backend.jdbc.execute.response.unit.ExecuteRespons
import io.shardingsphere.proxy.backend.jdbc.execute.response.unit.ExecuteUpdateResponseUnit;
import io.shardingsphere.proxy.backend.jdbc.wrapper.JDBCExecutorWrapper;
import io.shardingsphere.proxy.transport.mysql.constant.ColumnType;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.QueryResponsePackets;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.ColumnDefinition41Packet;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.FieldCountPacket;
import io.shardingsphere.proxy.transport.mysql.packet.command.query.QueryResponsePackets;
import io.shardingsphere.proxy.transport.mysql.packet.generic.EofPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket;
import io.shardingsphere.proxy.backend.BackendExecutorContext;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
@@ -42,7 +43,6 @@ import java.sql.Statement;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;

/**
 * SQL Execute engine for JDBC.
@@ -55,12 +55,12 @@ import java.util.concurrent.ExecutorService;
@RequiredArgsConstructor
public abstract class JDBCExecuteEngine implements SQLExecuteEngine {
    
    private final ShardingExecuteEngine shardingExecuteEngine = BackendExecutorContext.getInstance().getShardingExecuteEngine();
    
    private final List<QueryResult> queryResults = new LinkedList<>();
    
    private final BackendConnection backendConnection;
    
    private final ExecutorService executorService = BackendExecutorContext.getInstance().getShardingExecuteEngine().getExecutorService();
    
    private final JDBCExecutorWrapper jdbcExecutorWrapper;
    
    private int columnCount;
+83 −104
Original line number Diff line number Diff line
@@ -18,7 +18,7 @@
package io.shardingsphere.proxy.backend.jdbc.execute.memory;

import io.shardingsphere.core.constant.transaction.TransactionType;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.executor.ShardingGroupExecuteCallback;
import io.shardingsphere.core.merger.QueryResult;
import io.shardingsphere.core.routing.SQLRouteResult;
import io.shardingsphere.core.routing.SQLUnit;
@@ -29,9 +29,9 @@ import io.shardingsphere.proxy.backend.jdbc.execute.response.ExecuteResponse;
import io.shardingsphere.proxy.backend.jdbc.execute.response.ExecuteUpdateResponse;
import io.shardingsphere.proxy.backend.jdbc.execute.response.unit.ExecuteQueryResponseUnit;
import io.shardingsphere.proxy.backend.jdbc.execute.response.unit.ExecuteResponseUnit;
import io.shardingsphere.proxy.backend.jdbc.execute.response.unit.ExecuteUpdateResponseUnit;
import io.shardingsphere.proxy.backend.jdbc.wrapper.JDBCExecutorWrapper;
import io.shardingsphere.proxy.config.RuleRegistry;
import lombok.RequiredArgsConstructor;

import java.sql.Connection;
import java.sql.ResultSet;
@@ -40,12 +40,8 @@ import java.sql.Statement;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * Connection strictly execute engine.
@@ -61,69 +57,69 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine {
    
    @Override
    public ExecuteResponse execute(final SQLRouteResult routeResult, final boolean isReturnGeneratedKeys) throws SQLException {
        Map<String, Collection<SQLUnit>> sqlExecutionUnits = routeResult.getSQLUnitGroups();
        Entry<String, Collection<SQLUnit>> firstEntry = sqlExecutionUnits.entrySet().iterator().next();
        sqlExecutionUnits.remove(firstEntry.getKey());
        List<Future<Collection<ExecuteResponseUnit>>> futureList;
        Map<String, Collection<SQLUnit>> sqlUnitGroups = routeResult.getSQLUnitGroups();
        Collection<ExecuteResponseUnit> executeResponseUnits;
        try {
            if (TransactionType.XA == RuleRegistry.getInstance().getTransactionType()) {
            futureList = asyncExecuteWithXA(isReturnGeneratedKeys, sqlExecutionUnits);
                final Map<String, Map<SQLUnit, Statement>> sqlUnitStatements = new HashMap<>(sqlUnitGroups.size(), 1);
                for (Entry<String, Collection<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
                    sqlUnitStatements.put(entry.getKey(), createSQLUnitStatement(entry.getKey(), entry.getValue(), isReturnGeneratedKeys));
                }
                executeResponseUnits = getShardingExecuteEngine().groupExecute(
                        sqlUnitGroups, new FirstTransactionGroupExecuteCallback(isReturnGeneratedKeys), new XATransactionGroupExecuteCallback(isReturnGeneratedKeys, sqlUnitStatements));
            } else {
            futureList = asyncExecuteWithoutXA(isReturnGeneratedKeys, sqlExecutionUnits);
                executeResponseUnits = getShardingExecuteEngine().groupExecute(
                        sqlUnitGroups, new FirstTransactionGroupExecuteCallback(isReturnGeneratedKeys), new LocalTransactionGroupExecuteCallback(isReturnGeneratedKeys));
            }
        Collection<ExecuteResponseUnit> firstExecuteResponseUnits = syncExecute(isReturnGeneratedKeys, firstEntry.getKey(), firstEntry.getValue());
        return getExecuteQueryResponse(firstExecuteResponseUnits, futureList);
        } catch (final Exception ex) {
            throw new SQLException(ex);
        }
    
    private List<Future<Collection<ExecuteResponseUnit>>> asyncExecuteWithXA(final boolean isReturnGeneratedKeys, final Map<String, Collection<SQLUnit>> sqlUnitGroups) throws SQLException {
        List<Future<Collection<ExecuteResponseUnit>>> result = new LinkedList<>();
        for (Entry<String, Collection<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
            final Map<SQLUnit, Statement> sqlUnitStatementMap = createSQLUnitStatement(entry.getKey(), entry.getValue(), isReturnGeneratedKeys);
            result.add(getExecutorService().submit(new Callable<Collection<ExecuteResponseUnit>>() {
                
                @Override
                public Collection<ExecuteResponseUnit> call() throws SQLException {
                    Collection<ExecuteResponseUnit> result = new LinkedList<>();
                    for (Entry<SQLUnit, Statement> each : sqlUnitStatementMap.entrySet()) {
                        result.add(executeWithoutMetadata(each.getValue(), each.getKey().getSql(), isReturnGeneratedKeys));
        return getExecuteQueryResponse(executeResponseUnits);
    }
                    return result;
                }
            }));
    
    private Map<SQLUnit, Statement> createSQLUnitStatement(final String dataSourceName, final Collection<SQLUnit> sqlUnits, final boolean isReturnGeneratedKeys) throws SQLException {
        Map<SQLUnit, Statement> result = new HashMap<>(sqlUnits.size(), 1);
        Connection connection = getBackendConnection().getConnection(dataSourceName);
        for (SQLUnit each : sqlUnits) {
            result.put(each, getJdbcExecutorWrapper().createStatement(connection, each.getSql(), isReturnGeneratedKeys));
        }
        return result;
    }
    
    private List<Future<Collection<ExecuteResponseUnit>>> asyncExecuteWithoutXA(final boolean isReturnGeneratedKeys, final Map<String, Collection<SQLUnit>> sqlUnitGroups) {
        List<Future<Collection<ExecuteResponseUnit>>> result = new LinkedList<>();
        for (Entry<String, Collection<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
            final String dataSourceName = entry.getKey();
            final Collection<SQLUnit> sqlUnits = entry.getValue();
            result.add(getExecutorService().submit(new Callable<Collection<ExecuteResponseUnit>>() {
    private ExecuteResponse getExecuteQueryResponse(final Collection<ExecuteResponseUnit> executeResponseUnits) {
        ExecuteResponseUnit firstExecuteResponseUnit = executeResponseUnits.iterator().next();
        return firstExecuteResponseUnit instanceof ExecuteQueryResponseUnit
                ? getExecuteQueryResponse((ExecuteQueryResponseUnit) firstExecuteResponseUnit, executeResponseUnits) : getExecuteUpdateResponse(executeResponseUnits);
    }
    
                @Override
                public Collection<ExecuteResponseUnit> call() throws SQLException {
                    Collection<ExecuteResponseUnit> result = new LinkedList<>();
                    Map<SQLUnit, Statement> sqlUnitStatementMap = createSQLUnitStatement(dataSourceName, sqlUnits, isReturnGeneratedKeys);
                    for (Entry<SQLUnit, Statement> each : sqlUnitStatementMap.entrySet()) {
                        result.add(executeWithoutMetadata(each.getValue(), each.getKey().getSql(), isReturnGeneratedKeys));
    private ExecuteResponse getExecuteQueryResponse(final ExecuteQueryResponseUnit firstExecuteResponseUnit, final Collection<ExecuteResponseUnit> executeResponseUnits) {
        ExecuteQueryResponse result = new ExecuteQueryResponse(firstExecuteResponseUnit.getQueryResponsePackets());
        for (ExecuteResponseUnit each : executeResponseUnits) {
            result.getQueryResults().add(((ExecuteQueryResponseUnit) each).getQueryResult());
        }
        return result;
    }
            }));
        }
        return result;
    
    private ExecuteResponse getExecuteUpdateResponse(final Collection<ExecuteResponseUnit> executeResponseUnits) {
        return new ExecuteUpdateResponse(executeResponseUnits);
    }
    
    private Map<SQLUnit, Statement> createSQLUnitStatement(final String dataSourceName, final Collection<SQLUnit> sqlUnits, final boolean isReturnGeneratedKeys) throws SQLException {
        Map<SQLUnit, Statement> result = new HashMap<>(sqlUnits.size(), 1);
        Connection connection = getBackendConnection().getConnection(dataSourceName);
        for (SQLUnit each : sqlUnits) {
            result.put(each, getJdbcExecutorWrapper().createStatement(connection, each.getSql(), isReturnGeneratedKeys));
    @Override
    protected void setFetchSize(final Statement statement) {
    }
        return result;
    
    @Override
    protected QueryResult createQueryResult(final ResultSet resultSet) throws SQLException {
        return new MemoryQueryResult(resultSet);
    }
    
    private Collection<ExecuteResponseUnit> syncExecute(final boolean isReturnGeneratedKeys, final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws SQLException {
    @RequiredArgsConstructor
    class FirstTransactionGroupExecuteCallback implements ShardingGroupExecuteCallback<SQLUnit, ExecuteResponseUnit> {
        
        private final boolean isReturnGeneratedKeys;
        
        @Override
        public Collection<ExecuteResponseUnit> execute(final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws Exception {
            Collection<ExecuteResponseUnit> result = new LinkedList<>();
            boolean hasMetaData = false;
            Connection connection = getBackendConnection().getConnection(dataSourceName);
@@ -141,54 +137,37 @@ public final class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine {
            }
            return result;
        }
    
    private ExecuteResponse getExecuteQueryResponse(final Collection<ExecuteResponseUnit> firstExecuteResponseUnits, final List<Future<Collection<ExecuteResponseUnit>>> futureList) {
        ExecuteResponseUnit firstExecuteResponseUnit = firstExecuteResponseUnits.iterator().next();
        return firstExecuteResponseUnit instanceof ExecuteQueryResponseUnit
                ? getExecuteQueryResponse((ExecuteQueryResponseUnit) firstExecuteResponseUnit, firstExecuteResponseUnits, futureList) : getExecuteUpdateResponse(firstExecuteResponseUnits, futureList);
    }
    
    private ExecuteResponse getExecuteQueryResponse(
            final ExecuteQueryResponseUnit firstExecuteResponseUnit, final Collection<ExecuteResponseUnit> firstExecuteResponseUnits, final List<Future<Collection<ExecuteResponseUnit>>> futureList) {
        ExecuteQueryResponse result = new ExecuteQueryResponse(firstExecuteResponseUnit.getQueryResponsePackets());
        for (ExecuteResponseUnit each : firstExecuteResponseUnits) {
            result.getQueryResults().add(((ExecuteQueryResponseUnit) each).getQueryResult());
        }
        for (Future<Collection<ExecuteResponseUnit>> each : futureList) {
            try {
                Collection<ExecuteResponseUnit> executeResponses = each.get();
                for (ExecuteResponseUnit executeResponse : executeResponses) {
                    if (executeResponse instanceof ExecuteQueryResponseUnit) {
                        result.getQueryResults().add(((ExecuteQueryResponseUnit) executeResponse).getQueryResult());
                    }
                }
            } catch (final InterruptedException | ExecutionException ex) {
                throw new ShardingException(ex.getMessage(), ex);
            }
        }
        return result;
    }
    @RequiredArgsConstructor
    class XATransactionGroupExecuteCallback implements ShardingGroupExecuteCallback<SQLUnit, ExecuteResponseUnit> {
    
    private ExecuteResponse getExecuteUpdateResponse(final Collection<ExecuteResponseUnit> firstExecuteResponseUnits, final List<Future<Collection<ExecuteResponseUnit>>> futureList) {
        ExecuteUpdateResponse result = new ExecuteUpdateResponse(firstExecuteResponseUnits);
        for (Future<Collection<ExecuteResponseUnit>> each : futureList) {
            try {
                for (ExecuteResponseUnit executeResponse : each.get()) {
                    result.getPackets().add(((ExecuteUpdateResponseUnit) executeResponse).getOkPacket());
                }
            } catch (final InterruptedException | ExecutionException ex) {
                throw new ShardingException(ex.getMessage(), ex);
            }
        private final boolean isReturnGeneratedKeys;
    
        private final Map<String, Map<SQLUnit, Statement>> sqlUnitStatements;
        
        @Override
        public Collection<ExecuteResponseUnit> execute(final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws Exception {
            Collection<ExecuteResponseUnit> result = new LinkedList<>();
            for (Entry<SQLUnit, Statement> each : sqlUnitStatements.get(dataSourceName).entrySet()) {
                result.add(executeWithoutMetadata(each.getValue(), each.getKey().getSql(), isReturnGeneratedKeys));
            }
            return result;
        }
    
    @Override
    protected void setFetchSize(final Statement statement) {
    }
    
    @RequiredArgsConstructor
    class LocalTransactionGroupExecuteCallback implements ShardingGroupExecuteCallback<SQLUnit, ExecuteResponseUnit> {
    
        private final boolean isReturnGeneratedKeys;
        
        @Override
    protected QueryResult createQueryResult(final ResultSet resultSet) throws SQLException {
        return new MemoryQueryResult(resultSet);
        public Collection<ExecuteResponseUnit> execute(final String dataSourceName, final Collection<SQLUnit> sqlUnits) throws Exception {
            Collection<ExecuteResponseUnit> result = new LinkedList<>();
            for (Entry<SQLUnit, Statement> each : createSQLUnitStatement(dataSourceName, sqlUnits, isReturnGeneratedKeys).entrySet()) {
                result.add(executeWithoutMetadata(each.getValue(), each.getKey().getSql(), isReturnGeneratedKeys));
            }
            return result;
        }
    }
}
+0 −4
Original line number Diff line number Diff line
@@ -37,10 +37,6 @@ public final class ExecuteUpdateResponse implements ExecuteResponse {
    @Getter
    private final List<OKPacket> packets = new LinkedList<>();
    
    public ExecuteUpdateResponse(final OKPacket packet) {
        packets.add(packet);
    }
    
    public ExecuteUpdateResponse(final Collection<ExecuteResponseUnit> responseUnits) {
        for (ExecuteResponseUnit each : responseUnits) {
            packets.add(((ExecuteUpdateResponseUnit) each).getOkPacket());
+67 −76

File changed.

Preview size limit exceeded, changes collapsed.