Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCBackendHandler.java +1 −24 Original line number Diff line number Diff line Loading @@ -52,9 +52,7 @@ import javax.transaction.Status; import javax.transaction.SystemException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; /** Loading Loading @@ -131,14 +129,7 @@ public abstract class JDBCBackendHandler implements BackendHandler { private CommandResponsePackets merge(final SQLStatement sqlStatement) throws SQLException { if (executeResponse instanceof ExecuteUpdateResponse) { Collection<DatabasePacket> headPackets = new LinkedList<>(); for (DatabasePacket each : ((ExecuteUpdateResponse) executeResponse).getPackets()) { if (each instanceof ErrPacket) { return new CommandResponsePackets(each); } headPackets.add(each); } return mergeUpdate(headPackets); return ((ExecuteUpdateResponse) executeResponse).merge(); } QueryResponsePackets result = ((ExecuteQueryResponse) executeResponse).getQueryResponsePackets(); currentSequenceId += result.getPackets().size(); Loading @@ -151,20 +142,6 @@ public abstract class JDBCBackendHandler implements BackendHandler { return MergeEngineFactory.newInstance(ruleRegistry.getShardingRule(), ((ExecuteQueryResponse) executeResponse).getQueryResults(), sqlStatement, ruleRegistry.getShardingMetaData()).merge(); } private CommandResponsePackets mergeUpdate(final Collection<DatabasePacket> packets) { int affectedRows = 0; long lastInsertId = 0; for (DatabasePacket each : packets) { if (each instanceof OKPacket) { OKPacket okPacket = (OKPacket) each; affectedRows += okPacket.getAffectedRows(); // TODO consider about insert multiple values lastInsertId = okPacket.getLastInsertId(); } } return new CommandResponsePackets(new OKPacket(1, affectedRows, lastInsertId)); } private SQLRouteResult doMasterSlaveRoute() { SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); SQLRouteResult result = new SQLRouteResult(sqlStatement); Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/execute/response/ExecuteUpdateResponse.java +22 −5 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ package io.shardingsphere.proxy.backend.common.jdbc.execute.response; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteResponseUnit; import io.shardingsphere.proxy.transport.common.packet.DatabasePacket; import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets; import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket; import lombok.Getter; import java.util.Collection; Loading @@ -30,22 +32,37 @@ import java.util.List; * * @author zhangliang */ @Getter public final class ExecuteUpdateResponse implements ExecuteResponse { @Getter private final List<DatabasePacket> packets = new LinkedList<>(); private final DatabasePacket firstPacket; public ExecuteUpdateResponse(final DatabasePacket packet) { packets.add(packet); firstPacket = packets.iterator().next(); } public ExecuteUpdateResponse(final Collection<ExecuteResponseUnit> responseUnits) { for (ExecuteResponseUnit each : responseUnits) { packets.add(each.getCommandResponsePackets().getHeadPacket()); } firstPacket = packets.iterator().next(); } /** * Merge packets. * * @return merged packet. */ public CommandResponsePackets merge() { int affectedRows = 0; long lastInsertId = 0; for (DatabasePacket each : packets) { if (each instanceof OKPacket) { OKPacket okPacket = (OKPacket) each; affectedRows += okPacket.getAffectedRows(); // TODO consider about insert multiple values lastInsertId = okPacket.getLastInsertId(); } } return new CommandResponsePackets(new OKPacket(1, affectedRows, lastInsertId)); } } Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCBackendHandler.java +1 −24 Original line number Diff line number Diff line Loading @@ -52,9 +52,7 @@ import javax.transaction.Status; import javax.transaction.SystemException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; /** Loading Loading @@ -131,14 +129,7 @@ public abstract class JDBCBackendHandler implements BackendHandler { private CommandResponsePackets merge(final SQLStatement sqlStatement) throws SQLException { if (executeResponse instanceof ExecuteUpdateResponse) { Collection<DatabasePacket> headPackets = new LinkedList<>(); for (DatabasePacket each : ((ExecuteUpdateResponse) executeResponse).getPackets()) { if (each instanceof ErrPacket) { return new CommandResponsePackets(each); } headPackets.add(each); } return mergeUpdate(headPackets); return ((ExecuteUpdateResponse) executeResponse).merge(); } QueryResponsePackets result = ((ExecuteQueryResponse) executeResponse).getQueryResponsePackets(); currentSequenceId += result.getPackets().size(); Loading @@ -151,20 +142,6 @@ public abstract class JDBCBackendHandler implements BackendHandler { return MergeEngineFactory.newInstance(ruleRegistry.getShardingRule(), ((ExecuteQueryResponse) executeResponse).getQueryResults(), sqlStatement, ruleRegistry.getShardingMetaData()).merge(); } private CommandResponsePackets mergeUpdate(final Collection<DatabasePacket> packets) { int affectedRows = 0; long lastInsertId = 0; for (DatabasePacket each : packets) { if (each instanceof OKPacket) { OKPacket okPacket = (OKPacket) each; affectedRows += okPacket.getAffectedRows(); // TODO consider about insert multiple values lastInsertId = okPacket.getLastInsertId(); } } return new CommandResponsePackets(new OKPacket(1, affectedRows, lastInsertId)); } private SQLRouteResult doMasterSlaveRoute() { SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge(); SQLRouteResult result = new SQLRouteResult(sqlStatement); Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/execute/response/ExecuteUpdateResponse.java +22 −5 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ package io.shardingsphere.proxy.backend.common.jdbc.execute.response; import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteResponseUnit; import io.shardingsphere.proxy.transport.common.packet.DatabasePacket; import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets; import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket; import lombok.Getter; import java.util.Collection; Loading @@ -30,22 +32,37 @@ import java.util.List; * * @author zhangliang */ @Getter public final class ExecuteUpdateResponse implements ExecuteResponse { @Getter private final List<DatabasePacket> packets = new LinkedList<>(); private final DatabasePacket firstPacket; public ExecuteUpdateResponse(final DatabasePacket packet) { packets.add(packet); firstPacket = packets.iterator().next(); } public ExecuteUpdateResponse(final Collection<ExecuteResponseUnit> responseUnits) { for (ExecuteResponseUnit each : responseUnits) { packets.add(each.getCommandResponsePackets().getHeadPacket()); } firstPacket = packets.iterator().next(); } /** * Merge packets. * * @return merged packet. */ public CommandResponsePackets merge() { int affectedRows = 0; long lastInsertId = 0; for (DatabasePacket each : packets) { if (each instanceof OKPacket) { OKPacket okPacket = (OKPacket) each; affectedRows += okPacket.getAffectedRows(); // TODO consider about insert multiple values lastInsertId = okPacket.getLastInsertId(); } } return new CommandResponsePackets(new OKPacket(1, affectedRows, lastInsertId)); } }