Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java +5 −6 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; Loading @@ -31,7 +32,7 @@ public class DataxParameters extends AbstractParameters { /** * if custom json config,eg 0, 1 */ private Integer customConfig; private int customConfig; /** * if customConfig eq 1 ,then json is usable Loading Loading @@ -88,11 +89,11 @@ public class DataxParameters extends AbstractParameters { */ private int jobSpeedRecord; public Integer getCustomConfig() { public int getCustomConfig() { return customConfig; } public void setCustomConfig(Integer customConfig) { public void setCustomConfig(int customConfig) { this.customConfig = customConfig; } Loading Loading @@ -184,11 +185,9 @@ public class DataxParameters extends AbstractParameters { this.jobSpeedRecord = jobSpeedRecord; } @Override public boolean checkParameters() { if (customConfig == null) return false; if (customConfig == 0) { if (customConfig == Flag.NO.ordinal()) { return dataSource != 0 && dataTarget != 0 && StringUtils.isNotEmpty(sql) Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java +1 −1 Original line number Diff line number Diff line Loading @@ -48,7 +48,7 @@ public class ParameterUtils { * @return convert parameters place holders */ public static String convertParameterPlaceholders(String parameterString, Map<String, String> parameterMap) { if (StringUtils.isEmpty(parameterString)) { if (StringUtils.isEmpty(parameterString) || parameterMap == null) { return parameterString; } Loading dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +17 −32 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; Loading Loading @@ -149,9 +150,16 @@ public class DataxTask extends AbstractTask { String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); // combining local and global parameters Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), dataXParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); // run datax process String jsonFilePath = buildDataxJsonFile(); String shellCommandFilePath = buildShellCommandFile(jsonFilePath); String jsonFilePath = buildDataxJsonFile(paramsMap); String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath); setExitStatusCode(commandExecuteResult.getExitStatusCode()); Loading Loading @@ -184,7 +192,7 @@ public class DataxTask extends AbstractTask { * @return datax json file name * @throws Exception if error throws Exception */ private String buildDataxJsonFile() private String buildDataxJsonFile(Map<String, Property> paramsMap) throws Exception { // generate json String fileName = String.format("%s/%s_job.json", Loading @@ -197,26 +205,9 @@ public class DataxTask extends AbstractTask { return fileName; } if (dataXParameters.getCustomConfig() == 1){ if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()){ json = dataXParameters.getJson().replaceAll("\\r\\n", "\n"); /** * combining local and global parameters */ Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), dataXParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); if (paramsMap != null){ json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap)); } }else { JSONObject job = new JSONObject(); job.put("content", buildDataxJobContentJson()); job.put("setting", buildDataxJobSettingJson()); Loading @@ -227,6 +218,9 @@ public class DataxTask extends AbstractTask { json = root.toString(); } // replace placeholder json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap)); logger.debug("datax job json : {}", json); // create datax json file Loading Loading @@ -359,7 +353,7 @@ public class DataxTask extends AbstractTask { * @return shell command file name * @throws Exception if error throws Exception */ private String buildShellCommandFile(String jobConfigFilePath) private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap) throws Exception { // generate scripts String fileName = String.format("%s/%s_node.%s", Loading @@ -380,18 +374,9 @@ public class DataxTask extends AbstractTask { sbr.append(DATAX_HOME_EVN); sbr.append(" "); sbr.append(jobConfigFilePath); String dataxCommand = sbr.toString(); // combining local and global parameters // replace placeholder Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), dataXParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); if (paramsMap != null) { dataxCommand = ParameterUtils.convertParameterPlaceholders(dataxCommand, ParamUtils.convert(paramsMap)); } String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr.toString(), ParamUtils.convert(paramsMap)); logger.debug("raw script : {}", dataxCommand); Loading dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java +6 −5 Original line number Diff line number Diff line Loading @@ -21,9 +21,9 @@ import java.lang.reflect.Method; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; Loading Loading @@ -273,14 +273,15 @@ public class DataxTaskTest { setTaskParems(0); buildDataJson(); } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); } } public void buildDataJson() throws Exception { Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile"); Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile", new Class[]{Map.class}); method.setAccessible(true); String filePath = (String) method.invoke(dataxTask, null); String filePath = (String) method.invoke(dataxTask, new Object[]{null}); Assert.assertNotNull(filePath); } Loading Loading @@ -358,9 +359,9 @@ public class DataxTaskTest { public void testBuildShellCommandFile() throws Exception { try { Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class); Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class, Map.class); method.setAccessible(true); Assert.assertNotNull(method.invoke(dataxTask, "test.json")); Assert.assertNotNull(method.invoke(dataxTask, "test.json", null)); } catch (Exception e) { Assert.fail(e.getMessage()); Loading dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue +11 −0 Original line number Diff line number Diff line Loading @@ -236,6 +236,12 @@ _onPostStatements (a) { this.postStatements = a }, /** * return localParams */ _onLocalParams (a) { this.localParams = a }, /** * verification */ Loading @@ -246,6 +252,11 @@ return false } // localParams Subcomponent verification if (!this.$refs.refLocalParams._verifProp()) { return false } // storage this.$emit('on-params', { customConfig: this.customConfig, Loading Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java +5 −6 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; Loading @@ -31,7 +32,7 @@ public class DataxParameters extends AbstractParameters { /** * if custom json config,eg 0, 1 */ private Integer customConfig; private int customConfig; /** * if customConfig eq 1 ,then json is usable Loading Loading @@ -88,11 +89,11 @@ public class DataxParameters extends AbstractParameters { */ private int jobSpeedRecord; public Integer getCustomConfig() { public int getCustomConfig() { return customConfig; } public void setCustomConfig(Integer customConfig) { public void setCustomConfig(int customConfig) { this.customConfig = customConfig; } Loading Loading @@ -184,11 +185,9 @@ public class DataxParameters extends AbstractParameters { this.jobSpeedRecord = jobSpeedRecord; } @Override public boolean checkParameters() { if (customConfig == null) return false; if (customConfig == 0) { if (customConfig == Flag.NO.ordinal()) { return dataSource != 0 && dataTarget != 0 && StringUtils.isNotEmpty(sql) Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java +1 −1 Original line number Diff line number Diff line Loading @@ -48,7 +48,7 @@ public class ParameterUtils { * @return convert parameters place holders */ public static String convertParameterPlaceholders(String parameterString, Map<String, String> parameterMap) { if (StringUtils.isEmpty(parameterString)) { if (StringUtils.isEmpty(parameterString) || parameterMap == null) { return parameterString; } Loading
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +17 −32 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; Loading Loading @@ -149,9 +150,16 @@ public class DataxTask extends AbstractTask { String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); // combining local and global parameters Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), dataXParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); // run datax process String jsonFilePath = buildDataxJsonFile(); String shellCommandFilePath = buildShellCommandFile(jsonFilePath); String jsonFilePath = buildDataxJsonFile(paramsMap); String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath); setExitStatusCode(commandExecuteResult.getExitStatusCode()); Loading Loading @@ -184,7 +192,7 @@ public class DataxTask extends AbstractTask { * @return datax json file name * @throws Exception if error throws Exception */ private String buildDataxJsonFile() private String buildDataxJsonFile(Map<String, Property> paramsMap) throws Exception { // generate json String fileName = String.format("%s/%s_job.json", Loading @@ -197,26 +205,9 @@ public class DataxTask extends AbstractTask { return fileName; } if (dataXParameters.getCustomConfig() == 1){ if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()){ json = dataXParameters.getJson().replaceAll("\\r\\n", "\n"); /** * combining local and global parameters */ Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), dataXParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); if (paramsMap != null){ json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap)); } }else { JSONObject job = new JSONObject(); job.put("content", buildDataxJobContentJson()); job.put("setting", buildDataxJobSettingJson()); Loading @@ -227,6 +218,9 @@ public class DataxTask extends AbstractTask { json = root.toString(); } // replace placeholder json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap)); logger.debug("datax job json : {}", json); // create datax json file Loading Loading @@ -359,7 +353,7 @@ public class DataxTask extends AbstractTask { * @return shell command file name * @throws Exception if error throws Exception */ private String buildShellCommandFile(String jobConfigFilePath) private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap) throws Exception { // generate scripts String fileName = String.format("%s/%s_node.%s", Loading @@ -380,18 +374,9 @@ public class DataxTask extends AbstractTask { sbr.append(DATAX_HOME_EVN); sbr.append(" "); sbr.append(jobConfigFilePath); String dataxCommand = sbr.toString(); // combining local and global parameters // replace placeholder Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), dataXParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); if (paramsMap != null) { dataxCommand = ParameterUtils.convertParameterPlaceholders(dataxCommand, ParamUtils.convert(paramsMap)); } String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr.toString(), ParamUtils.convert(paramsMap)); logger.debug("raw script : {}", dataxCommand); Loading
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java +6 −5 Original line number Diff line number Diff line Loading @@ -21,9 +21,9 @@ import java.lang.reflect.Method; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; Loading Loading @@ -273,14 +273,15 @@ public class DataxTaskTest { setTaskParems(0); buildDataJson(); } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); } } public void buildDataJson() throws Exception { Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile"); Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile", new Class[]{Map.class}); method.setAccessible(true); String filePath = (String) method.invoke(dataxTask, null); String filePath = (String) method.invoke(dataxTask, new Object[]{null}); Assert.assertNotNull(filePath); } Loading Loading @@ -358,9 +359,9 @@ public class DataxTaskTest { public void testBuildShellCommandFile() throws Exception { try { Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class); Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class, Map.class); method.setAccessible(true); Assert.assertNotNull(method.invoke(dataxTask, "test.json")); Assert.assertNotNull(method.invoke(dataxTask, "test.json", null)); } catch (Exception e) { Assert.fail(e.getMessage()); Loading
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue +11 −0 Original line number Diff line number Diff line Loading @@ -236,6 +236,12 @@ _onPostStatements (a) { this.postStatements = a }, /** * return localParams */ _onLocalParams (a) { this.localParams = a }, /** * verification */ Loading @@ -246,6 +252,11 @@ return false } // localParams Subcomponent verification if (!this.$refs.refLocalParams._verifProp()) { return false } // storage this.$emit('on-params', { customConfig: this.customConfig, Loading