Loading dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +40 −35 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.commons.lang.StringUtils; import org.slf4j.LoggerFactory; Loading @@ -44,9 +45,11 @@ public class FlinkArgsUtils { */ public static List<String> buildArgs(FlinkParameters param) { List<String> args = new ArrayList<>(); String deployMode = "cluster"; if (StringUtils.isNotEmpty(param.getDeployMode())) { deployMode = param.getDeployMode(); String tmpDeployMode = param.getDeployMode(); if (StringUtils.isNotEmpty(tmpDeployMode)) { deployMode = tmpDeployMode; } if (!"local".equals(deployMode)) { Loading @@ -54,68 +57,70 @@ public class FlinkArgsUtils { args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster if (param.getSlot() != 0) { int slot = param.getSlot(); if (slot != 0) { args.add(Constants.FLINK_YARN_SLOT); args.add(String.format("%d", param.getSlot())); //-ys args.add(String.format("%d", slot)); //-ys } if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm String appName = param.getAppName(); if (StringUtils.isNotEmpty(appName)) { //-ynm args.add(Constants.FLINK_APP_NAME); args.add(param.getAppName()); args.add(appName); } if (param.getTaskManager() != 0) { //-yn int taskManager = param.getTaskManager(); if (taskManager != 0) { //-yn args.add(Constants.FLINK_TASK_MANAGE); args.add(String.format("%d", param.getTaskManager())); args.add(String.format("%d", taskManager)); } if (StringUtils.isNotEmpty(param.getJobManagerMemory())) { String jobManagerMemory = param.getJobManagerMemory(); if (StringUtils.isNotEmpty(jobManagerMemory)) { args.add(Constants.FLINK_JOB_MANAGE_MEM); args.add(param.getJobManagerMemory()); //-yjm args.add(jobManagerMemory); //-yjm } if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm String taskManagerMemory = param.getTaskManagerMemory(); if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm args.add(Constants.FLINK_TASK_MANAGE_MEM); args.add(param.getTaskManagerMemory()); args.add(taskManagerMemory); } args.add(Constants.FLINK_detach); //-d } if (param.getProgramType() != null) { if (param.getProgramType() != ProgramType.PYTHON) { if (StringUtils.isNotEmpty(param.getMainClass())) { ProgramType programType = param.getProgramType(); String mainClass = param.getMainClass(); if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { args.add(Constants.FLINK_MAIN_CLASS); //-c args.add(param.getMainClass()); //main class } } } if (param.getMainJar() != null) { args.add(param.getMainJar().getRes()); ResourceInfo mainJar = param.getMainJar(); if (mainJar != null) { args.add(mainJar.getRes()); } if (StringUtils.isNotEmpty(param.getMainArgs())) { args.add(param.getMainArgs()); String mainArgs = param.getMainArgs(); if (StringUtils.isNotEmpty(mainArgs)) { args.add(mainArgs); } // --files --conf --libjar ... if (StringUtils.isNotEmpty(param.getOthers())) { String others = param.getOthers(); if (!others.contains("--qu")) { if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) { String queue = param.getQueue(); if (StringUtils.isNotEmpty(others)) { if (!others.contains(Constants.FLINK_QUEUE) && StringUtils.isNotEmpty(queue) && !deployMode.equals("local")) { args.add(Constants.FLINK_QUEUE); args.add(param.getQueue()); } } args.add(param.getOthers()); } else if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) { args.add(others); } else if (StringUtils.isNotEmpty(queue) && !deployMode.equals("local")) { args.add(Constants.FLINK_QUEUE); args.add(param.getQueue()); } return args; Loading dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java 0 → 100644 +131 −0 Original line number Diff line number Diff line /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; /** * Test FlinkArgsUtils */ public class FlinkArgsUtilsTest { private static final Logger logger = LoggerFactory.getLogger(FlinkArgsUtilsTest.class); public String mode = "cluster"; public int slot = 2; public String appName = "testFlink"; public int taskManager = 4; public String taskManagerMemory = "2G"; public String jobManagerMemory = "4G"; public ProgramType programType = ProgramType.JAVA; public String mainClass = "com.test"; public ResourceInfo mainJar = null; public String mainArgs = "testArgs"; public String queue = "queue1"; public String others = "--input file:///home"; @Before public void setUp() throws Exception { ResourceInfo main = new ResourceInfo(); main.setRes("testflink-1.0.0-SNAPSHOT.jar"); mainJar = main; } /** * Test buildArgs */ @Test public void testBuildArgs() { //Define params FlinkParameters param = new FlinkParameters(); param.setDeployMode(mode); param.setMainClass(mainClass); param.setAppName(appName); param.setSlot(slot); param.setTaskManager(taskManager); param.setJobManagerMemory(jobManagerMemory); param.setTaskManagerMemory(taskManagerMemory); param.setMainJar(mainJar); param.setProgramType(programType); param.setMainArgs(mainArgs); param.setQueue(queue); param.setOthers(others); //Invoke buildArgs List<String> result = FlinkArgsUtils.buildArgs(param); for (String s : result) { logger.info(s); } //Expected values and order assertEquals(result.size(),20); assertEquals(result.get(0),"-m"); assertEquals(result.get(1),"yarn-cluster"); assertEquals(result.get(2),"-ys"); assertSame(Integer.valueOf(result.get(3)),slot); assertEquals(result.get(4),"-ynm"); assertEquals(result.get(5),appName); assertEquals(result.get(6),"-yn"); assertSame(Integer.valueOf(result.get(7)),taskManager); assertEquals(result.get(8),"-yjm"); assertEquals(result.get(9),jobManagerMemory); assertEquals(result.get(10),"-ytm"); assertEquals(result.get(11),taskManagerMemory); assertEquals(result.get(12),"-d"); assertEquals(result.get(13),"-c"); assertEquals(result.get(14),mainClass); assertEquals(result.get(15),mainJar.getRes()); assertEquals(result.get(16),mainArgs); assertEquals(result.get(17),"--qu"); assertEquals(result.get(18),queue); assertEquals(result.get(19),others); //Others param without --qu FlinkParameters param1 = new FlinkParameters(); param1.setQueue(queue); param1.setDeployMode(mode); result = FlinkArgsUtils.buildArgs(param1); assertEquals(result.size(),5); } } No newline at end of file pom.xml +1 −0 Original line number Diff line number Diff line Loading @@ -655,6 +655,7 @@ <include>**/alert/utils/JSONUtilsTest.java</include> <include>**/alert/utils/PropertyUtilsTest.java</include> <include>**/server/utils/SparkArgsUtilsTest.java</include> <include>**/server/utils/FlinkArgsUtilsTest.java</include> </includes> <!-- <skip>true</skip> --> </configuration> Loading Loading
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +40 −35 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.commons.lang.StringUtils; import org.slf4j.LoggerFactory; Loading @@ -44,9 +45,11 @@ public class FlinkArgsUtils { */ public static List<String> buildArgs(FlinkParameters param) { List<String> args = new ArrayList<>(); String deployMode = "cluster"; if (StringUtils.isNotEmpty(param.getDeployMode())) { deployMode = param.getDeployMode(); String tmpDeployMode = param.getDeployMode(); if (StringUtils.isNotEmpty(tmpDeployMode)) { deployMode = tmpDeployMode; } if (!"local".equals(deployMode)) { Loading @@ -54,68 +57,70 @@ public class FlinkArgsUtils { args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster if (param.getSlot() != 0) { int slot = param.getSlot(); if (slot != 0) { args.add(Constants.FLINK_YARN_SLOT); args.add(String.format("%d", param.getSlot())); //-ys args.add(String.format("%d", slot)); //-ys } if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm String appName = param.getAppName(); if (StringUtils.isNotEmpty(appName)) { //-ynm args.add(Constants.FLINK_APP_NAME); args.add(param.getAppName()); args.add(appName); } if (param.getTaskManager() != 0) { //-yn int taskManager = param.getTaskManager(); if (taskManager != 0) { //-yn args.add(Constants.FLINK_TASK_MANAGE); args.add(String.format("%d", param.getTaskManager())); args.add(String.format("%d", taskManager)); } if (StringUtils.isNotEmpty(param.getJobManagerMemory())) { String jobManagerMemory = param.getJobManagerMemory(); if (StringUtils.isNotEmpty(jobManagerMemory)) { args.add(Constants.FLINK_JOB_MANAGE_MEM); args.add(param.getJobManagerMemory()); //-yjm args.add(jobManagerMemory); //-yjm } if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm String taskManagerMemory = param.getTaskManagerMemory(); if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm args.add(Constants.FLINK_TASK_MANAGE_MEM); args.add(param.getTaskManagerMemory()); args.add(taskManagerMemory); } args.add(Constants.FLINK_detach); //-d } if (param.getProgramType() != null) { if (param.getProgramType() != ProgramType.PYTHON) { if (StringUtils.isNotEmpty(param.getMainClass())) { ProgramType programType = param.getProgramType(); String mainClass = param.getMainClass(); if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { args.add(Constants.FLINK_MAIN_CLASS); //-c args.add(param.getMainClass()); //main class } } } if (param.getMainJar() != null) { args.add(param.getMainJar().getRes()); ResourceInfo mainJar = param.getMainJar(); if (mainJar != null) { args.add(mainJar.getRes()); } if (StringUtils.isNotEmpty(param.getMainArgs())) { args.add(param.getMainArgs()); String mainArgs = param.getMainArgs(); if (StringUtils.isNotEmpty(mainArgs)) { args.add(mainArgs); } // --files --conf --libjar ... if (StringUtils.isNotEmpty(param.getOthers())) { String others = param.getOthers(); if (!others.contains("--qu")) { if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) { String queue = param.getQueue(); if (StringUtils.isNotEmpty(others)) { if (!others.contains(Constants.FLINK_QUEUE) && StringUtils.isNotEmpty(queue) && !deployMode.equals("local")) { args.add(Constants.FLINK_QUEUE); args.add(param.getQueue()); } } args.add(param.getOthers()); } else if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) { args.add(others); } else if (StringUtils.isNotEmpty(queue) && !deployMode.equals("local")) { args.add(Constants.FLINK_QUEUE); args.add(param.getQueue()); } return args; Loading
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java 0 → 100644 +131 −0 Original line number Diff line number Diff line /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; /** * Test FlinkArgsUtils */ public class FlinkArgsUtilsTest { private static final Logger logger = LoggerFactory.getLogger(FlinkArgsUtilsTest.class); public String mode = "cluster"; public int slot = 2; public String appName = "testFlink"; public int taskManager = 4; public String taskManagerMemory = "2G"; public String jobManagerMemory = "4G"; public ProgramType programType = ProgramType.JAVA; public String mainClass = "com.test"; public ResourceInfo mainJar = null; public String mainArgs = "testArgs"; public String queue = "queue1"; public String others = "--input file:///home"; @Before public void setUp() throws Exception { ResourceInfo main = new ResourceInfo(); main.setRes("testflink-1.0.0-SNAPSHOT.jar"); mainJar = main; } /** * Test buildArgs */ @Test public void testBuildArgs() { //Define params FlinkParameters param = new FlinkParameters(); param.setDeployMode(mode); param.setMainClass(mainClass); param.setAppName(appName); param.setSlot(slot); param.setTaskManager(taskManager); param.setJobManagerMemory(jobManagerMemory); param.setTaskManagerMemory(taskManagerMemory); param.setMainJar(mainJar); param.setProgramType(programType); param.setMainArgs(mainArgs); param.setQueue(queue); param.setOthers(others); //Invoke buildArgs List<String> result = FlinkArgsUtils.buildArgs(param); for (String s : result) { logger.info(s); } //Expected values and order assertEquals(result.size(),20); assertEquals(result.get(0),"-m"); assertEquals(result.get(1),"yarn-cluster"); assertEquals(result.get(2),"-ys"); assertSame(Integer.valueOf(result.get(3)),slot); assertEquals(result.get(4),"-ynm"); assertEquals(result.get(5),appName); assertEquals(result.get(6),"-yn"); assertSame(Integer.valueOf(result.get(7)),taskManager); assertEquals(result.get(8),"-yjm"); assertEquals(result.get(9),jobManagerMemory); assertEquals(result.get(10),"-ytm"); assertEquals(result.get(11),taskManagerMemory); assertEquals(result.get(12),"-d"); assertEquals(result.get(13),"-c"); assertEquals(result.get(14),mainClass); assertEquals(result.get(15),mainJar.getRes()); assertEquals(result.get(16),mainArgs); assertEquals(result.get(17),"--qu"); assertEquals(result.get(18),queue); assertEquals(result.get(19),others); //Others param without --qu FlinkParameters param1 = new FlinkParameters(); param1.setQueue(queue); param1.setDeployMode(mode); result = FlinkArgsUtils.buildArgs(param1); assertEquals(result.size(),5); } } No newline at end of file
pom.xml +1 −0 Original line number Diff line number Diff line Loading @@ -655,6 +655,7 @@ <include>**/alert/utils/JSONUtilsTest.java</include> <include>**/alert/utils/PropertyUtilsTest.java</include> <include>**/server/utils/SparkArgsUtilsTest.java</include> <include>**/server/utils/FlinkArgsUtilsTest.java</include> </includes> <!-- <skip>true</skip> --> </configuration> Loading