Loading .github/workflows/ci_e2e.yml +1 −1 Original line number Diff line number Diff line Loading @@ -58,7 +58,7 @@ jobs: wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb sudo dpkg -i google-chrome*.deb sudo apt-get install -f -y wget -N https://chromedriver.storage.googleapis.com/80.0.3987.106/chromedriver_linux64.zip wget -N https://chromedriver.storage.googleapis.com/83.0.4103.39/chromedriver_linux64.zip unzip chromedriver_linux64.zip sudo mv -f chromedriver /usr/local/share/chromedriver sudo ln -s /usr/local/share/chromedriver /usr/local/bin/chromedriver Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java +30 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.common.utils; import com.github.rholder.retry.*; import org.apache.dolphinscheduler.common.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; Loading @@ -28,6 +30,7 @@ import java.util.concurrent.TimeUnit; * The Retryer util. */ public class RetryerUtils { private static final Logger logger = LoggerFactory.getLogger(RetryerUtils.class); private static Retryer<Boolean> defaultRetryerResultCheck; private static Retryer<Boolean> defaultRetryerResultNoCheck; Loading Loading @@ -105,4 +108,31 @@ public class RetryerUtils { public static Boolean retryCall(final Callable<Boolean> callable) throws ExecutionException, RetryException { return retryCall(callable, true); } /** * Retry call silent without exceptions thrown * * @param callable the callable * @param checkResult whether check result * @return if no exceptions ,it's result returned by callable ,else always false */ public static boolean retryCallSilent(final Callable<Boolean> callable, boolean checkResult) { boolean result = false; try { result = getDefaultRetryer(checkResult).call(callable); } catch (ExecutionException | RetryException e) { logger.warn("Retry call {} failed {}", callable, e.getMessage(), e); } return result; } /** * Retry call silent without exceptions thrown * * @param callable the callable * @return if no exceptions ,it's result returned by callable ,else always false */ public static boolean retryCallSilent(final Callable<Boolean> callable) { return retryCallSilent(callable, true); } } dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java +109 −0 Original line number Diff line number Diff line Loading @@ -213,4 +213,113 @@ public class RetryerUtilsTest { testRetryExceptionWithPara(true); testRetryExceptionWithPara(false); } @Test public void testRetrySilent() { try { for (int execTarget = 1; execTarget <= 3; execTarget++) { int finalExecTarget = execTarget; int[] execTime = {0}; boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; return execTime[0] == finalExecTarget; }); Assert.assertEquals(finalExecTarget, execTime[0]); Assert.assertTrue(result); } } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } int[] execTime = {0}; try { boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; return execTime[0] == 4; }); Assert.assertFalse(result); } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } } @Test public void testRetrySilentWithPara() { try { for (int execTarget = 1; execTarget <= 3; execTarget++) { int finalExecTarget = execTarget; int[] execTime = {0}; boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; return execTime[0] == finalExecTarget; }, true); Assert.assertEquals(finalExecTarget, execTime[0]); Assert.assertTrue(result); } } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } int[] execTime = {0}; try { boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; return execTime[0] == 4; }, true); Assert.assertFalse(result); } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } } @Test public void testRetrySilentNoCheckResult(){ try { for (int execTarget = 1; execTarget <= 5; execTarget++) { int[] execTime = {0}; boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; return execTime[0] > 1; }, false); Assert.assertEquals(1, execTime[0]); Assert.assertFalse(result); } } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } } private void testRetrySilentExceptionWithPara(boolean checkResult) { try { for (int execTarget = 1; execTarget <= 3; execTarget++) { int finalExecTarget = execTarget; int[] execTime = {0}; boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; if (execTime[0] != finalExecTarget) { throw new IllegalArgumentException(String.valueOf(execTime[0])); } return true; }, checkResult); Assert.assertEquals(finalExecTarget, execTime[0]); Assert.assertTrue(result); } } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } int[] execTime = {0}; try { boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; if (execTime[0] != 4) { throw new IllegalArgumentException(String.valueOf(execTime[0])); } return true; }, checkResult); Assert.assertFalse(result); } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } } @Test public void testRetrySilentException() { testRetrySilentExceptionWithPara(true); testRetrySilentExceptionWithPara(false); } } dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +20 −34 Original line number Diff line number Diff line Loading @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor; import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; Loading @@ -36,10 +35,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.*; /** * netty executor manager Loading Loading @@ -87,17 +83,11 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{ */ @Override public Boolean execute(ExecutionContext context) throws ExecuteException { /** * all nodes */ Set<String> allNodes = getAllNodes(context); /** * fail nodes */ Set<String> failNodeSet = new HashSet<>(); LinkedList<String> allNodes = new LinkedList<>(); Set<String> nodes = getAllNodes(context); if (nodes != null) { allNodes.addAll(nodes); } /** * build command accord executeContext */ Loading @@ -106,29 +96,25 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{ /** * execute task host */ Host host = context.getHost(); String startHostAddress = context.getHost().getAddress(); // remove start host address and add it to head allNodes.remove(startHostAddress); allNodes.addFirst(startHostAddress); boolean success = false; while (!success) { for (String address : allNodes) { try { Host host = Host.of(address); doExecute(host, command); success = true; context.setHost(host); break; } catch (ExecuteException ex) { logger.error(String.format("execute command : %s error", command), ex); try { failNodeSet.add(host.getAddress()); Set<String> tmpAllIps = new HashSet<>(allNodes); Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet); if (remained != null && remained.size() > 0) { host = Host.of(remained.iterator().next()); logger.error("retry execute command : {} host : {}", command, host); } else { throw new ExecuteException("fail after try all nodes"); } } catch (Throwable t) { throw new ExecuteException("fail after try all nodes"); logger.error("retry execute command : {} host : {}", command, address); } } if (!success) { throw new ExecuteException("fail after try all nodes"); } return success; Loading dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +3 −3 Original line number Diff line number Diff line Loading @@ -87,7 +87,7 @@ background: url("../img/toolbar_SPARK.png") no-repeat 50% 50%; } .icos-FLINK { background: url("../img/toobar_FLINK.png") no-repeat 50% 50%; background: url("../img/toolbar_FLINK.png") no-repeat 50% 50%; } .icos-MR { background: url("../img/toolbar_MR.png") no-repeat 50% 50%; Loading @@ -99,7 +99,7 @@ background: url("../img/toolbar_DEPENDENT.png") no-repeat 50% 50%; } .icos-HTTP { background: url("../img/toobar_HTTP.png") no-repeat 50% 50%; background: url("../img/toolbar_HTTP.png") no-repeat 50% 50%; } .icos-DATAX { background: url("../img/toolbar_DATAX.png") no-repeat 50% 50%; Loading @@ -108,7 +108,7 @@ background: url("../img/toolbar_SQOOP.png") no-repeat 50% 50%; } .icos-CONDITIONS { background: url("../img/toobar_CONDITIONS.png") no-repeat 50% 50%; background: url("../img/toolbar_CONDITIONS.png") no-repeat 50% 50%; } .toolbar { width: 60px; Loading Loading
.github/workflows/ci_e2e.yml +1 −1 Original line number Diff line number Diff line Loading @@ -58,7 +58,7 @@ jobs: wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb sudo dpkg -i google-chrome*.deb sudo apt-get install -f -y wget -N https://chromedriver.storage.googleapis.com/80.0.3987.106/chromedriver_linux64.zip wget -N https://chromedriver.storage.googleapis.com/83.0.4103.39/chromedriver_linux64.zip unzip chromedriver_linux64.zip sudo mv -f chromedriver /usr/local/share/chromedriver sudo ln -s /usr/local/share/chromedriver /usr/local/bin/chromedriver Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java +30 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.common.utils; import com.github.rholder.retry.*; import org.apache.dolphinscheduler.common.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; Loading @@ -28,6 +30,7 @@ import java.util.concurrent.TimeUnit; * The Retryer util. */ public class RetryerUtils { private static final Logger logger = LoggerFactory.getLogger(RetryerUtils.class); private static Retryer<Boolean> defaultRetryerResultCheck; private static Retryer<Boolean> defaultRetryerResultNoCheck; Loading Loading @@ -105,4 +108,31 @@ public class RetryerUtils { public static Boolean retryCall(final Callable<Boolean> callable) throws ExecutionException, RetryException { return retryCall(callable, true); } /** * Retry call silent without exceptions thrown * * @param callable the callable * @param checkResult whether check result * @return if no exceptions ,it's result returned by callable ,else always false */ public static boolean retryCallSilent(final Callable<Boolean> callable, boolean checkResult) { boolean result = false; try { result = getDefaultRetryer(checkResult).call(callable); } catch (ExecutionException | RetryException e) { logger.warn("Retry call {} failed {}", callable, e.getMessage(), e); } return result; } /** * Retry call silent without exceptions thrown * * @param callable the callable * @return if no exceptions ,it's result returned by callable ,else always false */ public static boolean retryCallSilent(final Callable<Boolean> callable) { return retryCallSilent(callable, true); } }
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java +109 −0 Original line number Diff line number Diff line Loading @@ -213,4 +213,113 @@ public class RetryerUtilsTest { testRetryExceptionWithPara(true); testRetryExceptionWithPara(false); } @Test public void testRetrySilent() { try { for (int execTarget = 1; execTarget <= 3; execTarget++) { int finalExecTarget = execTarget; int[] execTime = {0}; boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; return execTime[0] == finalExecTarget; }); Assert.assertEquals(finalExecTarget, execTime[0]); Assert.assertTrue(result); } } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } int[] execTime = {0}; try { boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; return execTime[0] == 4; }); Assert.assertFalse(result); } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } } @Test public void testRetrySilentWithPara() { try { for (int execTarget = 1; execTarget <= 3; execTarget++) { int finalExecTarget = execTarget; int[] execTime = {0}; boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; return execTime[0] == finalExecTarget; }, true); Assert.assertEquals(finalExecTarget, execTime[0]); Assert.assertTrue(result); } } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } int[] execTime = {0}; try { boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; return execTime[0] == 4; }, true); Assert.assertFalse(result); } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } } @Test public void testRetrySilentNoCheckResult(){ try { for (int execTarget = 1; execTarget <= 5; execTarget++) { int[] execTime = {0}; boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; return execTime[0] > 1; }, false); Assert.assertEquals(1, execTime[0]); Assert.assertFalse(result); } } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } } private void testRetrySilentExceptionWithPara(boolean checkResult) { try { for (int execTarget = 1; execTarget <= 3; execTarget++) { int finalExecTarget = execTarget; int[] execTime = {0}; boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; if (execTime[0] != finalExecTarget) { throw new IllegalArgumentException(String.valueOf(execTime[0])); } return true; }, checkResult); Assert.assertEquals(finalExecTarget, execTime[0]); Assert.assertTrue(result); } } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } int[] execTime = {0}; try { boolean result = RetryerUtils.retryCallSilent(() -> { execTime[0]++; if (execTime[0] != 4) { throw new IllegalArgumentException(String.valueOf(execTime[0])); } return true; }, checkResult); Assert.assertFalse(result); } catch (Exception e) { Assert.fail("Unexpected exception " + e.getMessage()); } } @Test public void testRetrySilentException() { testRetrySilentExceptionWithPara(true); testRetrySilentExceptionWithPara(false); } }
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +20 −34 Original line number Diff line number Diff line Loading @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor; import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; Loading @@ -36,10 +35,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.*; /** * netty executor manager Loading Loading @@ -87,17 +83,11 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{ */ @Override public Boolean execute(ExecutionContext context) throws ExecuteException { /** * all nodes */ Set<String> allNodes = getAllNodes(context); /** * fail nodes */ Set<String> failNodeSet = new HashSet<>(); LinkedList<String> allNodes = new LinkedList<>(); Set<String> nodes = getAllNodes(context); if (nodes != null) { allNodes.addAll(nodes); } /** * build command accord executeContext */ Loading @@ -106,29 +96,25 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{ /** * execute task host */ Host host = context.getHost(); String startHostAddress = context.getHost().getAddress(); // remove start host address and add it to head allNodes.remove(startHostAddress); allNodes.addFirst(startHostAddress); boolean success = false; while (!success) { for (String address : allNodes) { try { Host host = Host.of(address); doExecute(host, command); success = true; context.setHost(host); break; } catch (ExecuteException ex) { logger.error(String.format("execute command : %s error", command), ex); try { failNodeSet.add(host.getAddress()); Set<String> tmpAllIps = new HashSet<>(allNodes); Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet); if (remained != null && remained.size() > 0) { host = Host.of(remained.iterator().next()); logger.error("retry execute command : {} host : {}", command, host); } else { throw new ExecuteException("fail after try all nodes"); } } catch (Throwable t) { throw new ExecuteException("fail after try all nodes"); logger.error("retry execute command : {} host : {}", command, address); } } if (!success) { throw new ExecuteException("fail after try all nodes"); } return success; Loading
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +3 −3 Original line number Diff line number Diff line Loading @@ -87,7 +87,7 @@ background: url("../img/toolbar_SPARK.png") no-repeat 50% 50%; } .icos-FLINK { background: url("../img/toobar_FLINK.png") no-repeat 50% 50%; background: url("../img/toolbar_FLINK.png") no-repeat 50% 50%; } .icos-MR { background: url("../img/toolbar_MR.png") no-repeat 50% 50%; Loading @@ -99,7 +99,7 @@ background: url("../img/toolbar_DEPENDENT.png") no-repeat 50% 50%; } .icos-HTTP { background: url("../img/toobar_HTTP.png") no-repeat 50% 50%; background: url("../img/toolbar_HTTP.png") no-repeat 50% 50%; } .icos-DATAX { background: url("../img/toolbar_DATAX.png") no-repeat 50% 50%; Loading @@ -108,7 +108,7 @@ background: url("../img/toolbar_SQOOP.png") no-repeat 50% 50%; } .icos-CONDITIONS { background: url("../img/toobar_CONDITIONS.png") no-repeat 50% 50%; background: url("../img/toolbar_CONDITIONS.png") no-repeat 50% 50%; } .toolbar { width: 60px; Loading