Unverified Commit 1a78c115 authored by Gabry.wu's avatar Gabry.wu Committed by GitHub
Browse files

simply NettyExecutorManager.execute logic making it readable (#2744)



* simply NettyExecutorManager.execute logic making it readable

* fix NPE

* remove unused import

Co-authored-by: default avatardailidong <dailidong66@gmail.com>
parent 772a8e2b
Loading
Loading
Loading
Loading
+20 −34
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.server.master.dispatch.executor;

import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -36,10 +35,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.*;

/**
 *  netty executor manager
@@ -87,17 +83,11 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
     */
    @Override
    public Boolean execute(ExecutionContext context) throws ExecuteException {

        /**
         *  all nodes
         */
        Set<String> allNodes = getAllNodes(context);

        /**
         * fail nodes
         */
        Set<String> failNodeSet = new HashSet<>();

        LinkedList<String> allNodes = new LinkedList<>();
        Set<String> nodes = getAllNodes(context);
        if (nodes != null) {
            allNodes.addAll(nodes);
        }
        /**
         *  build command accord executeContext
         */
@@ -106,29 +96,25 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
        /**
         * execute task host
         */
        Host host = context.getHost();
        String startHostAddress = context.getHost().getAddress();
        // remove start host address and add it to head
        allNodes.remove(startHostAddress);
        allNodes.addFirst(startHostAddress);
 
        boolean success = false;
        while (!success) {
        for (String address : allNodes) {
            try {
                Host host = Host.of(address);
                doExecute(host, command);
                success = true;
                context.setHost(host);
                break;
            } catch (ExecuteException ex) {
                logger.error(String.format("execute command : %s error", command), ex);
                try {
                    failNodeSet.add(host.getAddress());
                    Set<String> tmpAllIps = new HashSet<>(allNodes);
                    Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                    if (remained != null && remained.size() > 0) {
                        host = Host.of(remained.iterator().next());
                        logger.error("retry execute command : {} host : {}", command, host);
                    } else {
                        throw new ExecuteException("fail after try all nodes");
                    }
                } catch (Throwable t) {
                    throw new ExecuteException("fail after try all nodes");
                logger.error("retry execute command : {} host : {}", command, address);
            }
        }
        if (!success) {
            throw new ExecuteException("fail after try all nodes");
        }
        
        return success;