Commit 4d8d74ac authored by Yelli's avatar Yelli Committed by khadgarmage
Browse files

#839 enhancement : add Spark Task Component can switch Spark Version (#1494)

* add Spark Version in Spark Component

add Spark Version in Spark Component

* add license for SparkVersion.class

add license

* 1 add spark task UT
2 add spark version param check

* add assert check for sparkTaskTest
parent 4cf84587
Loading
Loading
Loading
Loading
+40 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.common.enums;

import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;

@Getter
public enum SparkVersion {

    /**
     * 0 SPARK1
     * 1 SPARK2
     */
    SPARK1(0, "SPARK1"),
    SPARK2(1, "SPARK2");

    SparkVersion(int code, String descp){
        this.code = code;
        this.descp = descp;
    }

    @EnumValue
    private final int code;
    private final String descp;
}
+15 −2
Original line number Diff line number Diff line
@@ -95,6 +95,11 @@ public class SparkParameters extends AbstractParameters {
   */
  private ProgramType programType;

  /**
   * spark version
   */
  private String sparkVersion;

  public ResourceInfo getMainJar() {
    return mainJar;
  }
@@ -200,9 +205,17 @@ public class SparkParameters extends AbstractParameters {
    this.programType = programType;
  }

  public String getSparkVersion() {
    return sparkVersion;
  }

  public void setSparkVersion(String sparkVersion) {
    this.sparkVersion = sparkVersion;
  }

  @Override
  public boolean checkParameters() {
    return mainJar != null && programType != null;
    return mainJar != null && programType != null && sparkVersion != null;
  }


@@ -211,7 +224,7 @@ public class SparkParameters extends AbstractParameters {
    if(resourceList !=null ) {
      this.resourceList.add(mainJar);
      return resourceList.stream()
              .map(p -> p.getRes()).collect(Collectors.toList());
              .map(ResourceInfo::getRes).collect(Collectors.toList());
    }
    return null;
  }
+17 −4
Original line number Diff line number Diff line
@@ -16,6 +16,8 @@
 */
package org.apache.dolphinscheduler.server.worker.task.spark;

import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
@@ -25,7 +27,6 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;

import java.util.ArrayList;
@@ -38,9 +39,14 @@ import java.util.Map;
public class SparkTask extends AbstractYarnTask {

  /**
   * spark command
   * spark1 command
   */
  private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";

  /**
   * spark2 command
   */
  private static final String SPARK_COMMAND = "spark-submit";
  private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";

  /**
   *  spark parameters
@@ -89,7 +95,14 @@ public class SparkTask extends AbstractYarnTask {
  protected String buildCommand() {
    List<String> args = new ArrayList<>();

    args.add(SPARK_COMMAND);
    //spark version
    String sparkCommand = SPARK2_COMMAND;

    if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
      sparkCommand = SPARK1_COMMAND;
    }

    args.add(sparkCommand);

    // other parameters
    args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
+141 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.server.worker.task.spark;

import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


public class SparkTaskTest {

    private static final Logger logger = LoggerFactory.getLogger(SparkTaskTest.class);

    /**
     * spark1 command
     */
    private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";

    /**
     * spark2 command
     */
    private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";

    @Test
    public void testSparkTaskInit() {

        TaskProps taskProps = new TaskProps();

        String spark1Params = "{" +
                "\"mainArgs\":\"\", " +
                "\"driverMemory\":\"1G\", " +
                "\"executorMemory\":\"2G\", " +
                "\"programType\":\"SCALA\", " +
                "\"mainClass\":\"basicetl.GlobalUserCar\", " +
                "\"driverCores\":\"2\", " +
                "\"deployMode\":\"cluster\", " +
                "\"executorCores\":2, " +
                "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " +
                "\"sparkVersion\":\"SPARK1\", " +
                "\"numExecutors\":\"10\", " +
                "\"localParams\":[], " +
                "\"others\":\"\", " +
                "\"resourceList\":[]" +
                "}";

        String spark2Params = "{" +
                "\"mainArgs\":\"\", " +
                "\"driverMemory\":\"1G\", " +
                "\"executorMemory\":\"2G\", " +
                "\"programType\":\"SCALA\", " +
                "\"mainClass\":\"basicetl.GlobalUserCar\", " +
                "\"driverCores\":\"2\", " +
                "\"deployMode\":\"cluster\", " +
                "\"executorCores\":2, " +
                "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " +
                "\"sparkVersion\":\"SPARK2\", " +
                "\"numExecutors\":\"10\", " +
                "\"localParams\":[], " +
                "\"others\":\"\", " +
                "\"resourceList\":[]" +
                "}";

        taskProps.setTaskParams(spark2Params);

        logger.info("spark task params {}", taskProps.getTaskParams());

        SparkParameters sparkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), SparkParameters.class);

        assert sparkParameters != null;
        if (!sparkParameters.checkParameters()) {
            throw new RuntimeException("spark task params is not valid");
        }
        sparkParameters.setQueue(taskProps.getQueue());

        if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
            String args = sparkParameters.getMainArgs();

            /**
             *  combining local and global parameters
             */
            Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
                    taskProps.getDefinedParams(),
                    sparkParameters.getLocalParametersMap(),
                    taskProps.getCmdTypeIfComplement(),
                    taskProps.getScheduleTime());
            if (paramsMap != null) {
                args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
            }
            sparkParameters.setMainArgs(args);
        }

        List<String> args = new ArrayList<>();

        //spark version
        String sparkCommand = SPARK2_COMMAND;

        if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
            sparkCommand = SPARK1_COMMAND;
        }

        args.add(sparkCommand);

        // other parameters
        args.addAll(SparkArgsUtils.buildArgs(sparkParameters));

        String sparkArgs = String.join(" ", args);

        logger.info("spark task command : {}", sparkArgs);

        Assert.assertEquals(sparkArgs.split(" ")[0], SPARK2_COMMAND );

    }
}
+24 −2
Original line number Diff line number Diff line
@@ -32,6 +32,22 @@
        </x-select>
      </div>
    </m-list-box>
    <m-list-box>
      <div slot="text">{{$t('Spark Version')}}</div>
      <div slot="content">
        <x-select
                style="width: 130px;"
                v-model="sparkVersion"
                :disabled="isDetails">
          <x-option
                  v-for="city in sparkVersionList"
                  :key="city.code"
                  :value="city.code"
                  :label="city.code">
          </x-option>
        </x-select>
      </div>
    </m-list-box>
    <m-list-box v-if="programType !== 'PYTHON'">
      <div slot="text">{{$t('Main class')}}</div>
      <div slot="content">
@@ -224,7 +240,11 @@
        // Program type
        programType: 'SCALA',
        // Program type(List)
        programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }]
        programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }],
        // Spark version
        sparkVersion: 'SPARK2',
        // Spark version(LIst)
        sparkVersionList: [{ code: 'SPARK2' }, { code: 'SPARK1' }]
      }
    },
    props: {
@@ -318,7 +338,8 @@
          executorCores: this.executorCores,
          mainArgs: this.mainArgs,
          others: this.others,
          programType: this.programType
          programType: this.programType,
          sparkVersion: this.sparkVersion
        })
        return true
      },
@@ -366,6 +387,7 @@
          this.mainArgs = o.params.mainArgs || ''
          this.others = o.params.others
          this.programType = o.params.programType || 'SCALA'
          this.sparkVersion = o.params.sparkVersion || 'SPARK2'

          // backfill resourceList
          let resourceList = o.params.resourceList || []
Loading