Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java +13 −0 Original line number Diff line number Diff line Loading @@ -90,6 +90,11 @@ public class FlinkParameters extends AbstractParameters { */ private String others; /** * flink version */ private String flinkVersion; /** * program type * 0 JAVA,1 SCALA,2 PYTHON Loading Loading @@ -200,6 +205,14 @@ public class FlinkParameters extends AbstractParameters { this.programType = programType; } public String getFlinkVersion() { return flinkVersion; } public void setFlinkVersion(String flinkVersion) { this.flinkVersion = flinkVersion; } @Override public boolean checkParameters() { return mainJar != null && programType != null; Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java +27 −0 Original line number Diff line number Diff line Loading @@ -56,6 +56,17 @@ public class HttpParameters extends AbstractParameters { private String condition; /** * Connect Timeout * Unit: ms */ private int connectTimeout ; /** * Socket Timeout * Unit: ms */ private int socketTimeout ; @Override public boolean checkParameters() { Loading Loading @@ -106,4 +117,20 @@ public class HttpParameters extends AbstractParameters { public void setCondition(String condition) { this.condition = condition; } public int getConnectTimeout() { return connectTimeout; } public void setConnectTimeout(int connectTimeout) { this.connectTimeout = connectTimeout; } public int getSocketTimeout() { return socketTimeout; } public void setSocketTimeout(int socketTimeout) { this.socketTimeout = socketTimeout; } } dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/HttpParametersTest.java 0 → 100644 +91 −0 Original line number Diff line number Diff line /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.common.task; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.enums.HttpCheckCondition; import org.apache.dolphinscheduler.common.enums.HttpMethod; import org.apache.dolphinscheduler.common.process.HttpProperty; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.http.HttpParameters; import org.junit.Assert; import org.junit.Test; import com.alibaba.fastjson.JSON; /** * http parameter */ public class HttpParametersTest { @Test public void testGenerator(){ String paramData = "{\"localParams\":[],\"httpParams\":[],\"url\":\"https://www.baidu.com/\"," + "\"httpMethod\":\"GET\",\"httpCheckCondition\":\"STATUS_CODE_DEFAULT\",\"condition\":\"\",\"connectTimeout\":\"10000\",\"socketTimeout\":\"10000\"}"; HttpParameters httpParameters = JSON.parseObject(paramData, HttpParameters.class); Assert.assertEquals(10000,httpParameters.getConnectTimeout() ); Assert.assertEquals(10000,httpParameters.getSocketTimeout()); Assert.assertEquals("https://www.baidu.com/",httpParameters.getUrl()); Assert.assertEquals(HttpMethod.GET,httpParameters.getHttpMethod()); Assert.assertEquals(HttpCheckCondition.STATUS_CODE_DEFAULT,httpParameters.getHttpCheckCondition()); Assert.assertEquals("",httpParameters.getCondition()); } @Test public void testCheckParameters(){ String paramData = "{\"localParams\":[],\"httpParams\":[],\"url\":\"https://www.baidu.com/\"," + "\"httpMethod\":\"GET\",\"httpCheckCondition\":\"STATUS_CODE_DEFAULT\",\"condition\":\"\",\"connectTimeout\":\"10000\",\"socketTimeout\":\"10000\"}"; HttpParameters httpParameters = JSON.parseObject(paramData, HttpParameters.class); Assert.assertTrue( httpParameters.checkParameters()); Assert.assertEquals(10000,httpParameters.getConnectTimeout() ); Assert.assertEquals(10000,httpParameters.getSocketTimeout()); Assert.assertEquals("https://www.baidu.com/",httpParameters.getUrl()); Assert.assertEquals(HttpMethod.GET,httpParameters.getHttpMethod()); Assert.assertEquals(HttpCheckCondition.STATUS_CODE_DEFAULT,httpParameters.getHttpCheckCondition()); Assert.assertEquals("",httpParameters.getCondition()); } @Test public void testCheckValues() { String paramData = "{\"localParams\":[],\"httpParams\":[],\"url\":\"https://www.baidu.com/\"," + "\"httpMethod\":\"GET\",\"httpCheckCondition\":\"STATUS_CODE_DEFAULT\",\"condition\":\"\",\"connectTimeout\":\"10000\",\"socketTimeout\":\"10000\"}"; HttpParameters httpParameters = JSON.parseObject(paramData, HttpParameters.class); Assert.assertTrue( httpParameters.checkParameters()); Assert.assertEquals(10000,httpParameters.getConnectTimeout() ); Assert.assertEquals(10000,httpParameters.getSocketTimeout()); Assert.assertEquals("https://www.baidu.com/",httpParameters.getUrl()); Assert.assertEquals(HttpMethod.GET,httpParameters.getHttpMethod()); Assert.assertEquals(HttpCheckCondition.STATUS_CODE_DEFAULT,httpParameters.getHttpCheckCondition()); Assert.assertEquals("",httpParameters.getCondition()); Assert.assertEquals(0,httpParameters.getLocalParametersMap().size()); Assert.assertEquals(0,httpParameters.getResourceFilesList().size()); } } dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +11 −7 Original line number Diff line number Diff line Loading @@ -28,10 +28,12 @@ import java.util.List; /** * spark args utils * flink args utils */ public class FlinkArgsUtils { private static final String LOCAL_DEPLOY_MODE = "local"; private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10"; /** * build args * @param param flink parameters Loading @@ -44,7 +46,6 @@ public class FlinkArgsUtils { String tmpDeployMode = param.getDeployMode(); if (StringUtils.isNotEmpty(tmpDeployMode)) { deployMode = tmpDeployMode; } if (!LOCAL_DEPLOY_MODE.equals(deployMode)) { args.add(Constants.FLINK_RUN_MODE); //-m Loading @@ -63,12 +64,15 @@ public class FlinkArgsUtils { args.add(appName); } // judgy flink version,from flink1.10,the parameter -yn removed String flinkVersion = param.getFlinkVersion(); if (FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) { int taskManager = param.getTaskManager(); if (taskManager != 0) { //-yn args.add(Constants.FLINK_TASK_MANAGE); args.add(String.format("%d", taskManager)); } } String jobManagerMemory = param.getJobManagerMemory(); if (StringUtils.isNotEmpty(jobManagerMemory)) { args.add(Constants.FLINK_JOB_MANAGE_MEM); Loading dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +1 −8 Original line number Diff line number Diff line Loading @@ -61,13 +61,6 @@ public class HttpTask extends AbstractTask { */ private HttpParameters httpParameters; /** * Convert mill seconds to second unit */ protected static final int MAX_CONNECTION_MILLISECONDS = 60000; /** * application json */ Loading Loading @@ -298,7 +291,7 @@ public class HttpTask extends AbstractTask { * @return RequestConfig */ private RequestConfig requestConfig() { return RequestConfig.custom().setSocketTimeout(MAX_CONNECTION_MILLISECONDS).setConnectTimeout(MAX_CONNECTION_MILLISECONDS).build(); return RequestConfig.custom().setSocketTimeout(httpParameters.getSocketTimeout()).setConnectTimeout(httpParameters.getConnectTimeout()).build(); } /** Loading Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java +13 −0 Original line number Diff line number Diff line Loading @@ -90,6 +90,11 @@ public class FlinkParameters extends AbstractParameters { */ private String others; /** * flink version */ private String flinkVersion; /** * program type * 0 JAVA,1 SCALA,2 PYTHON Loading Loading @@ -200,6 +205,14 @@ public class FlinkParameters extends AbstractParameters { this.programType = programType; } public String getFlinkVersion() { return flinkVersion; } public void setFlinkVersion(String flinkVersion) { this.flinkVersion = flinkVersion; } @Override public boolean checkParameters() { return mainJar != null && programType != null; Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java +27 −0 Original line number Diff line number Diff line Loading @@ -56,6 +56,17 @@ public class HttpParameters extends AbstractParameters { private String condition; /** * Connect Timeout * Unit: ms */ private int connectTimeout ; /** * Socket Timeout * Unit: ms */ private int socketTimeout ; @Override public boolean checkParameters() { Loading Loading @@ -106,4 +117,20 @@ public class HttpParameters extends AbstractParameters { public void setCondition(String condition) { this.condition = condition; } public int getConnectTimeout() { return connectTimeout; } public void setConnectTimeout(int connectTimeout) { this.connectTimeout = connectTimeout; } public int getSocketTimeout() { return socketTimeout; } public void setSocketTimeout(int socketTimeout) { this.socketTimeout = socketTimeout; } }
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/HttpParametersTest.java 0 → 100644 +91 −0 Original line number Diff line number Diff line /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.common.task; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.enums.HttpCheckCondition; import org.apache.dolphinscheduler.common.enums.HttpMethod; import org.apache.dolphinscheduler.common.process.HttpProperty; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.http.HttpParameters; import org.junit.Assert; import org.junit.Test; import com.alibaba.fastjson.JSON; /** * http parameter */ public class HttpParametersTest { @Test public void testGenerator(){ String paramData = "{\"localParams\":[],\"httpParams\":[],\"url\":\"https://www.baidu.com/\"," + "\"httpMethod\":\"GET\",\"httpCheckCondition\":\"STATUS_CODE_DEFAULT\",\"condition\":\"\",\"connectTimeout\":\"10000\",\"socketTimeout\":\"10000\"}"; HttpParameters httpParameters = JSON.parseObject(paramData, HttpParameters.class); Assert.assertEquals(10000,httpParameters.getConnectTimeout() ); Assert.assertEquals(10000,httpParameters.getSocketTimeout()); Assert.assertEquals("https://www.baidu.com/",httpParameters.getUrl()); Assert.assertEquals(HttpMethod.GET,httpParameters.getHttpMethod()); Assert.assertEquals(HttpCheckCondition.STATUS_CODE_DEFAULT,httpParameters.getHttpCheckCondition()); Assert.assertEquals("",httpParameters.getCondition()); } @Test public void testCheckParameters(){ String paramData = "{\"localParams\":[],\"httpParams\":[],\"url\":\"https://www.baidu.com/\"," + "\"httpMethod\":\"GET\",\"httpCheckCondition\":\"STATUS_CODE_DEFAULT\",\"condition\":\"\",\"connectTimeout\":\"10000\",\"socketTimeout\":\"10000\"}"; HttpParameters httpParameters = JSON.parseObject(paramData, HttpParameters.class); Assert.assertTrue( httpParameters.checkParameters()); Assert.assertEquals(10000,httpParameters.getConnectTimeout() ); Assert.assertEquals(10000,httpParameters.getSocketTimeout()); Assert.assertEquals("https://www.baidu.com/",httpParameters.getUrl()); Assert.assertEquals(HttpMethod.GET,httpParameters.getHttpMethod()); Assert.assertEquals(HttpCheckCondition.STATUS_CODE_DEFAULT,httpParameters.getHttpCheckCondition()); Assert.assertEquals("",httpParameters.getCondition()); } @Test public void testCheckValues() { String paramData = "{\"localParams\":[],\"httpParams\":[],\"url\":\"https://www.baidu.com/\"," + "\"httpMethod\":\"GET\",\"httpCheckCondition\":\"STATUS_CODE_DEFAULT\",\"condition\":\"\",\"connectTimeout\":\"10000\",\"socketTimeout\":\"10000\"}"; HttpParameters httpParameters = JSON.parseObject(paramData, HttpParameters.class); Assert.assertTrue( httpParameters.checkParameters()); Assert.assertEquals(10000,httpParameters.getConnectTimeout() ); Assert.assertEquals(10000,httpParameters.getSocketTimeout()); Assert.assertEquals("https://www.baidu.com/",httpParameters.getUrl()); Assert.assertEquals(HttpMethod.GET,httpParameters.getHttpMethod()); Assert.assertEquals(HttpCheckCondition.STATUS_CODE_DEFAULT,httpParameters.getHttpCheckCondition()); Assert.assertEquals("",httpParameters.getCondition()); Assert.assertEquals(0,httpParameters.getLocalParametersMap().size()); Assert.assertEquals(0,httpParameters.getResourceFilesList().size()); } }
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +11 −7 Original line number Diff line number Diff line Loading @@ -28,10 +28,12 @@ import java.util.List; /** * spark args utils * flink args utils */ public class FlinkArgsUtils { private static final String LOCAL_DEPLOY_MODE = "local"; private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10"; /** * build args * @param param flink parameters Loading @@ -44,7 +46,6 @@ public class FlinkArgsUtils { String tmpDeployMode = param.getDeployMode(); if (StringUtils.isNotEmpty(tmpDeployMode)) { deployMode = tmpDeployMode; } if (!LOCAL_DEPLOY_MODE.equals(deployMode)) { args.add(Constants.FLINK_RUN_MODE); //-m Loading @@ -63,12 +64,15 @@ public class FlinkArgsUtils { args.add(appName); } // judgy flink version,from flink1.10,the parameter -yn removed String flinkVersion = param.getFlinkVersion(); if (FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) { int taskManager = param.getTaskManager(); if (taskManager != 0) { //-yn args.add(Constants.FLINK_TASK_MANAGE); args.add(String.format("%d", taskManager)); } } String jobManagerMemory = param.getJobManagerMemory(); if (StringUtils.isNotEmpty(jobManagerMemory)) { args.add(Constants.FLINK_JOB_MANAGE_MEM); Loading
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +1 −8 Original line number Diff line number Diff line Loading @@ -61,13 +61,6 @@ public class HttpTask extends AbstractTask { */ private HttpParameters httpParameters; /** * Convert mill seconds to second unit */ protected static final int MAX_CONNECTION_MILLISECONDS = 60000; /** * application json */ Loading Loading @@ -298,7 +291,7 @@ public class HttpTask extends AbstractTask { * @return RequestConfig */ private RequestConfig requestConfig() { return RequestConfig.custom().setSocketTimeout(MAX_CONNECTION_MILLISECONDS).setConnectTimeout(MAX_CONNECTION_MILLISECONDS).build(); return RequestConfig.custom().setSocketTimeout(httpParameters.getSocketTimeout()).setConnectTimeout(httpParameters.getConnectTimeout()).build(); } /** Loading