Commit 10eeaf08 authored by terrymanu's avatar terrymanu
Browse files

refactor MySQLFrontendHandler

parent 79b7567f
Loading
Loading
Loading
Loading
+46 −35
Original line number Diff line number Diff line
@@ -76,28 +76,43 @@ public final class MySQLFrontendHandler extends FrontendHandler {
    
    @Override
    protected void executeCommand(final ChannelHandlerContext context, final ByteBuf message) {
        new ExecutorGroup(context.channel().id()).getExecutorService().execute(new Runnable() {
        new ExecutorGroup(context.channel().id()).getExecutorService().execute(new CommandExecutor(context, message));
    }
    
    @RequiredArgsConstructor
    static class CommandExecutor implements Runnable {
        
        private final ChannelHandlerContext context;
        
        private final ByteBuf message;
        
        private int currentSequenceId;
        
        @Override
        public void run() {
                int currentSequenceId = 0;
            try (MySQLPacketPayload payload = new MySQLPacketPayload(message);
                 BackendConnection backendConnection = new BackendConnection()) {
                    int sequenceId = payload.readInt1();
                    int connectionId = MySQLResultCache.getInstance().getConnection(context.channel().id().asShortText());
                    CommandPacket commandPacket = CommandPacketFactory.getCommandPacket(sequenceId, connectionId, payload, backendConnection);
                CommandPacket commandPacket = getCommandPacket(payload, backendConnection);
                CommandResponsePackets responsePackets = commandPacket.execute();
                for (DatabasePacket each : responsePackets.getPackets()) {
                    context.writeAndFlush(each);
                        if (each instanceof OKPacket || each instanceof ErrPacket) {
                            return;
                }
                if (commandPacket instanceof QueryCommandPacket && !(responsePackets.getHeadPacket() instanceof OKPacket) && !(responsePackets.getHeadPacket() instanceof ErrPacket)) {
                    writeMoreResults((QueryCommandPacket) commandPacket, responsePackets.getPackets().size());
                }
                    if (!(commandPacket instanceof QueryCommandPacket)) {
                        return;
            } catch (final SQLException ex) {
                context.writeAndFlush(new ErrPacket(++currentSequenceId, ex));
            }
                    QueryCommandPacket queryCommandPacket = (QueryCommandPacket) commandPacket;
                    currentSequenceId = responsePackets.getPackets().size();
        }
        
        private CommandPacket getCommandPacket(final MySQLPacketPayload payload, final BackendConnection backendConnection) {
            int sequenceId = payload.readInt1();
            int connectionId = MySQLResultCache.getInstance().getConnection(context.channel().id().asShortText());
            return CommandPacketFactory.getCommandPacket(sequenceId, connectionId, payload, backendConnection);
        }
        
        private void writeMoreResults(final QueryCommandPacket queryCommandPacket, final int headPacketsCount) throws SQLException {
            currentSequenceId = headPacketsCount;
            while (queryCommandPacket.next()) {
                // TODO try to use wait notify
                while (!context.channel().isWritable()) {
@@ -108,10 +123,6 @@ public final class MySQLFrontendHandler extends FrontendHandler {
                context.writeAndFlush(resultValue);
            }
            context.writeAndFlush(new EofPacket(++currentSequenceId));
                } catch (final SQLException ex) {
                    context.writeAndFlush(new ErrPacket(++currentSequenceId, ex));
                }
        }
        });
    }
}