Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/frontend/mysql/MySQLFrontendHandler.java +11 −6 Original line number Diff line number Diff line Loading @@ -27,6 +27,8 @@ import io.shardingsphere.proxy.transport.mysql.constant.ServerErrorCode; import io.shardingsphere.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingsphere.proxy.transport.mysql.packet.command.CommandPacket; import io.shardingsphere.proxy.transport.mysql.packet.command.CommandPacketFactory; import io.shardingsphere.proxy.transport.mysql.packet.command.QueryCommandPacket; import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets; import io.shardingsphere.proxy.transport.mysql.packet.generic.EofPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket; Loading @@ -38,7 +40,6 @@ import io.shardingsphere.proxy.util.MySQLResultCache; import lombok.RequiredArgsConstructor; import java.sql.SQLException; import java.util.Collection; /** * MySQL frontend handler. Loading Loading @@ -85,20 +86,24 @@ public final class MySQLFrontendHandler extends FrontendHandler { int sequenceId = payload.readInt1(); int connectionId = MySQLResultCache.getInstance().getConnection(context.channel().id().asShortText()); CommandPacket commandPacket = CommandPacketFactory.getCommandPacket(sequenceId, connectionId, payload, backendConnection); Collection<DatabasePacket> packets = commandPacket.execute().getPackets(); for (DatabasePacket each : packets) { CommandResponsePackets responsePackets = commandPacket.execute(); for (DatabasePacket each : responsePackets.getPackets()) { context.writeAndFlush(each); if (each instanceof OKPacket || each instanceof ErrPacket) { return; } } currentSequenceId = packets.size(); while (commandPacket.next()) { if (!(commandPacket instanceof QueryCommandPacket)) { return; } QueryCommandPacket queryCommandPacket = (QueryCommandPacket) commandPacket; currentSequenceId = responsePackets.getPackets().size(); while (queryCommandPacket.next()) { // TODO try to use wait notify while (!context.channel().isWritable()) { continue; } DatabasePacket resultValue = commandPacket.getResultValue(); DatabasePacket resultValue = queryCommandPacket.getResultValue(); currentSequenceId = resultValue.getSequenceId(); context.writeAndFlush(resultValue); } Loading sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/CommandPacket.java +0 −19 Original line number Diff line number Diff line Loading @@ -17,12 +17,9 @@ package io.shardingsphere.proxy.transport.mysql.packet.command; import io.shardingsphere.proxy.transport.common.packet.DatabasePacket; import io.shardingsphere.proxy.transport.mysql.packet.MySQLPacket; import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets; import java.sql.SQLException; /** * Command packet. * Loading @@ -37,20 +34,4 @@ public interface CommandPacket extends MySQLPacket { * @return result packets to be sent */ CommandResponsePackets execute(); /** * Goto next result value. * * @return has more result value or not * @throws SQLException SQL exception */ boolean next() throws SQLException; /** * Get result value. * * @return database packet of result value * @throws SQLException SQL exception */ DatabasePacket getResultValue() throws SQLException; } sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/QueryCommandPacket.java 0 → 100644 +47 −0 Original line number Diff line number Diff line /* * Copyright 2016-2018 shardingsphere.io. * <p> * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * </p> */ package io.shardingsphere.proxy.transport.mysql.packet.command; import io.shardingsphere.proxy.transport.common.packet.DatabasePacket; import java.sql.SQLException; /** * Query command packet. * * @author zhangliang * @author wangkai */ public interface QueryCommandPacket extends CommandPacket { /** * Goto next result value. * * @return has more result value or not * @throws SQLException SQL exception */ boolean next() throws SQLException; /** * Get result value. * * @return database packet of result value * @throws SQLException SQL exception */ DatabasePacket getResultValue() throws SQLException; } sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/UnsupportedCommandPacket.java +0 −11 Original line number Diff line number Diff line Loading @@ -17,7 +17,6 @@ package io.shardingsphere.proxy.transport.mysql.packet.command; import io.shardingsphere.proxy.transport.common.packet.DatabasePacket; import io.shardingsphere.proxy.transport.mysql.constant.ServerErrorCode; import io.shardingsphere.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets; Loading Loading @@ -46,14 +45,4 @@ public final class UnsupportedCommandPacket implements CommandPacket { @Override public void write(final MySQLPacketPayload payload) { } @Override public boolean next() { return false; } @Override public DatabasePacket getResultValue() { return null; } } sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/statement/close/ComStmtClosePacket.java +0 −11 Original line number Diff line number Diff line Loading @@ -17,7 +17,6 @@ package io.shardingsphere.proxy.transport.mysql.packet.command.statement.close; import io.shardingsphere.proxy.transport.common.packet.DatabasePacket; import io.shardingsphere.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingsphere.proxy.transport.mysql.packet.command.CommandPacket; import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets; Loading Loading @@ -53,14 +52,4 @@ public class ComStmtClosePacket implements CommandPacket { log.debug("COM_STMT_CLOSE received for Sharding-Proxy: {}", statementId); return new CommandResponsePackets(new DummyPacket()); } @Override public boolean next() { return false; } @Override public DatabasePacket getResultValue() { return null; } } Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/frontend/mysql/MySQLFrontendHandler.java +11 −6 Original line number Diff line number Diff line Loading @@ -27,6 +27,8 @@ import io.shardingsphere.proxy.transport.mysql.constant.ServerErrorCode; import io.shardingsphere.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingsphere.proxy.transport.mysql.packet.command.CommandPacket; import io.shardingsphere.proxy.transport.mysql.packet.command.CommandPacketFactory; import io.shardingsphere.proxy.transport.mysql.packet.command.QueryCommandPacket; import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets; import io.shardingsphere.proxy.transport.mysql.packet.generic.EofPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket; Loading @@ -38,7 +40,6 @@ import io.shardingsphere.proxy.util.MySQLResultCache; import lombok.RequiredArgsConstructor; import java.sql.SQLException; import java.util.Collection; /** * MySQL frontend handler. Loading Loading @@ -85,20 +86,24 @@ public final class MySQLFrontendHandler extends FrontendHandler { int sequenceId = payload.readInt1(); int connectionId = MySQLResultCache.getInstance().getConnection(context.channel().id().asShortText()); CommandPacket commandPacket = CommandPacketFactory.getCommandPacket(sequenceId, connectionId, payload, backendConnection); Collection<DatabasePacket> packets = commandPacket.execute().getPackets(); for (DatabasePacket each : packets) { CommandResponsePackets responsePackets = commandPacket.execute(); for (DatabasePacket each : responsePackets.getPackets()) { context.writeAndFlush(each); if (each instanceof OKPacket || each instanceof ErrPacket) { return; } } currentSequenceId = packets.size(); while (commandPacket.next()) { if (!(commandPacket instanceof QueryCommandPacket)) { return; } QueryCommandPacket queryCommandPacket = (QueryCommandPacket) commandPacket; currentSequenceId = responsePackets.getPackets().size(); while (queryCommandPacket.next()) { // TODO try to use wait notify while (!context.channel().isWritable()) { continue; } DatabasePacket resultValue = commandPacket.getResultValue(); DatabasePacket resultValue = queryCommandPacket.getResultValue(); currentSequenceId = resultValue.getSequenceId(); context.writeAndFlush(resultValue); } Loading
sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/CommandPacket.java +0 −19 Original line number Diff line number Diff line Loading @@ -17,12 +17,9 @@ package io.shardingsphere.proxy.transport.mysql.packet.command; import io.shardingsphere.proxy.transport.common.packet.DatabasePacket; import io.shardingsphere.proxy.transport.mysql.packet.MySQLPacket; import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets; import java.sql.SQLException; /** * Command packet. * Loading @@ -37,20 +34,4 @@ public interface CommandPacket extends MySQLPacket { * @return result packets to be sent */ CommandResponsePackets execute(); /** * Goto next result value. * * @return has more result value or not * @throws SQLException SQL exception */ boolean next() throws SQLException; /** * Get result value. * * @return database packet of result value * @throws SQLException SQL exception */ DatabasePacket getResultValue() throws SQLException; }
sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/QueryCommandPacket.java 0 → 100644 +47 −0 Original line number Diff line number Diff line /* * Copyright 2016-2018 shardingsphere.io. * <p> * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * </p> */ package io.shardingsphere.proxy.transport.mysql.packet.command; import io.shardingsphere.proxy.transport.common.packet.DatabasePacket; import java.sql.SQLException; /** * Query command packet. * * @author zhangliang * @author wangkai */ public interface QueryCommandPacket extends CommandPacket { /** * Goto next result value. * * @return has more result value or not * @throws SQLException SQL exception */ boolean next() throws SQLException; /** * Get result value. * * @return database packet of result value * @throws SQLException SQL exception */ DatabasePacket getResultValue() throws SQLException; }
sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/UnsupportedCommandPacket.java +0 −11 Original line number Diff line number Diff line Loading @@ -17,7 +17,6 @@ package io.shardingsphere.proxy.transport.mysql.packet.command; import io.shardingsphere.proxy.transport.common.packet.DatabasePacket; import io.shardingsphere.proxy.transport.mysql.constant.ServerErrorCode; import io.shardingsphere.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets; Loading Loading @@ -46,14 +45,4 @@ public final class UnsupportedCommandPacket implements CommandPacket { @Override public void write(final MySQLPacketPayload payload) { } @Override public boolean next() { return false; } @Override public DatabasePacket getResultValue() { return null; } }
sharding-proxy/src/main/java/io/shardingsphere/proxy/transport/mysql/packet/command/statement/close/ComStmtClosePacket.java +0 −11 Original line number Diff line number Diff line Loading @@ -17,7 +17,6 @@ package io.shardingsphere.proxy.transport.mysql.packet.command.statement.close; import io.shardingsphere.proxy.transport.common.packet.DatabasePacket; import io.shardingsphere.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingsphere.proxy.transport.mysql.packet.command.CommandPacket; import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets; Loading Loading @@ -53,14 +52,4 @@ public class ComStmtClosePacket implements CommandPacket { log.debug("COM_STMT_CLOSE received for Sharding-Proxy: {}", statementId); return new CommandResponsePackets(new DummyPacket()); } @Override public boolean next() { return false; } @Override public DatabasePacket getResultValue() { return null; } }