Unverified Commit a7fd0a52 authored by Rubik-W's avatar Rubik-W Committed by GitHub
Browse files

Merge pull request #2943 from Eights-Li/dev-sqoop-optimization

Sqoop task optimization
parents 2749c7e5 a420644a
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -218,7 +218,7 @@ public enum Status {
    DATA_IS_NOT_VALID(50017,"data {0} not valid", "数据[{0}]无效"),
    DATA_IS_NULL(50018,"data {0} is null", "数据[{0}]不能为空"),
    PROCESS_NODE_HAS_CYCLE(50019,"process node has cycle", "流程节点间存在循环依赖"),
    PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node %s parameter invalid", "流程节点[%s]参数无效"),
    PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node {0} parameter invalid", "流程节点[{0}]参数无效"),
    PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"),
    DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022,"delete process definition by id error", "删除工作流定义错误"),
    SCHEDULE_CRON_STATE_ONLINE(50023,"the status of schedule {0} is already on line", "调度配置[{0}]已上线"),
+41 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.common.enums;

import com.baomidou.mybatisplus.annotation.EnumValue;

public enum  SqoopJobType {
    CUSTOM(0, "CUSTOM"),
    TEMPLATE(1, "TEMPLATE");

    SqoopJobType(int code, String descp){
        this.code = code;
        this.descp = descp;
    }

    @EnumValue
    private final int code;
    private final String descp;

    public int getCode() {
        return code;
    }

    public String getDescp() {
        return descp;
    }
}
+92 −7
Original line number Diff line number Diff line
@@ -16,6 +16,8 @@
 */
package org.apache.dolphinscheduler.common.task.sqoop;

import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -28,6 +30,23 @@ import java.util.List;
 */
public class SqoopParameters  extends AbstractParameters {

    /**
     * sqoop job type:
     * CUSTOM - custom sqoop job
     * TEMPLATE - sqoop template job
     */
    private String jobType;

    /**
     * customJob eq 1, use customShell
     */
    private String customShell;

    /**
     * sqoop job name - map-reduce job name
     */
    private String jobName;

    /**
     * model type
     */
@@ -53,6 +72,16 @@ public class SqoopParameters extends AbstractParameters {
     */
    private String targetParams;

    /**
     * hadoop custom param for sqoop job
     */
    private List<Property> hadoopCustomParams;

    /**
     * sqoop advanced param
     */
    private List<Property> sqoopAdvancedParams;

    public String getModelType() {
        return modelType;
    }
@@ -101,14 +130,70 @@ public class SqoopParameters extends AbstractParameters {
        this.targetParams = targetParams;
    }

    public String getJobType() {
        return jobType;
    }

    public void setJobType(String jobType) {
        this.jobType = jobType;
    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public String getCustomShell() {
        return customShell;
    }

    public void setCustomShell(String customShell) {
        this.customShell = customShell;
    }

    public List<Property> getHadoopCustomParams() {
        return hadoopCustomParams;
    }

    public void setHadoopCustomParams(List<Property> hadoopCustomParams) {
        this.hadoopCustomParams = hadoopCustomParams;
    }

    public List<Property> getSqoopAdvancedParams() {
        return sqoopAdvancedParams;
    }

    public void setSqoopAdvancedParams(List<Property> sqoopAdvancedParams) {
        this.sqoopAdvancedParams = sqoopAdvancedParams;
    }

    @Override
    public boolean checkParameters() {
        return StringUtils.isNotEmpty(modelType)&&

        boolean sqoopParamsCheck = false;

        if (StringUtils.isEmpty(jobType)) {
            return sqoopParamsCheck;
        }

        if (SqoopJobType.TEMPLATE.getDescp().equals(jobType)) {
            sqoopParamsCheck = StringUtils.isEmpty(customShell) &&
                    StringUtils.isNotEmpty(modelType) &&
                    StringUtils.isNotEmpty(jobName) &&
                    concurrency != 0 &&
                    StringUtils.isNotEmpty(sourceType) &&
                    StringUtils.isNotEmpty(targetType) &&
                    StringUtils.isNotEmpty(sourceParams) &&
                    StringUtils.isNotEmpty(targetParams);
        } else if (SqoopJobType.CUSTOM.getDescp().equals(jobType)) {
            sqoopParamsCheck = StringUtils.isNotEmpty(customShell) &&
                    StringUtils.isEmpty(jobName);
        }

        return sqoopParamsCheck;
    }

    @Override
+1 −1
Original line number Diff line number Diff line
@@ -106,7 +106,7 @@ public class TargetMysqlParameter {
        this.preQuery = preQuery;
    }

    public boolean isUpdate() {
    public boolean getIsUpdate() {
        return isUpdate;
    }

+18 −14
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.consumer;

import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -258,13 +259,15 @@ public class TaskPriorityQueueConsumer extends Thread{


    /**
     * set datax task relation
     * set sqoop task relation
     * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
     * @param taskNode taskNode
     */
    private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) {
        SqoopParameters sqoopParameters = JSONObject.parseObject(taskNode.getParams(), SqoopParameters.class);

        // sqoop job type is template set task relation
        if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
            SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
            TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);

@@ -283,6 +286,7 @@ public class TaskPriorityQueueConsumer extends Thread{
                sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
            }
        }
    }

    /**
     * set SQL task relation
Loading