Unverified Commit 8031c162 authored by superScala's avatar superScala Committed by GitHub
Browse files

add break & default logic for NettyDecoder.decode (#2969)



* add break & default logic for NettyDecoder.decode

* fix the bad code smells

Co-authored-by: default avatargabry <wu_shao_jie@qq.com>
Co-authored-by: default avatargabry.wu <8545796+gabrywu@users.noreply.github.com>
Co-authored-by: default avatardailidong <dailidong66@gmail.com>
parent f7ea4f19
Loading
Loading
Loading
Loading
+10 −0
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ import io.netty.handler.codec.ReplayingDecoder;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandHeader;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

@@ -31,6 +33,7 @@ import java.util.List;
 *  netty decoder
 */
public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
    private static final Logger logger = LoggerFactory.getLogger(NettyDecoder.class);

    public NettyDecoder(){
        super(State.MAGIC);
@@ -52,15 +55,19 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
            case MAGIC:
                checkMagic(in.readByte());
                checkpoint(State.COMMAND);
                break;
            case COMMAND:
                commandHeader.setType(in.readByte());
                checkpoint(State.OPAQUE);
                break;
            case OPAQUE:
                commandHeader.setOpaque(in.readLong());
                checkpoint(State.BODY_LENGTH);
                break;
            case BODY_LENGTH:
                commandHeader.setBodyLength(in.readInt());
                checkpoint(State.BODY);
                break;
            case BODY:
                byte[] body = new byte[commandHeader.getBodyLength()];
                in.readBytes(body);
@@ -72,6 +79,9 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
                out.add(packet);
                //
                checkpoint(State.MAGIC);
                break;
            default:
                logger.warn("unknown decoder state {}", state());
        }
    }