Unverified Commit bcf4a0e8 authored by dailidong's avatar dailidong Committed by GitHub
Browse files

Merge branch 'dev' into dev

parents 2bd3a9bf e837a73f
Loading
Loading
Loading
Loading
+12 −2
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -54,11 +55,19 @@ public class EnterpriseWeChatUtilsTest {
    private static final String enterpriseWechatUsers="LiGang,journey";
    private static final String msg = "hello world";

    private static final String enterpriseWechatTeamSendMsg = "{\\\"toparty\\\":\\\"$toParty\\\",\\\"agentid\\\":\\\"$agentId\\\",\\\"msgtype\\\":\\\"text\\\",\\\"text\\\":{\\\"content\\\":\\\"$msg\\\"},\\\"safe\\\":\\\"0\\\"}";
    private static final String enterpriseWechatUserSendMsg = "{\\\"touser\\\":\\\"$toUser\\\",\\\"agentid\\\":\\\"$agentId\\\",\\\"msgtype\\\":\\\"markdown\\\",\\\"markdown\\\":{\\\"content\\\":\\\"$msg\\\"}}";

    @Test
    public void testIsEnable(){
    @Before
    public void init(){
        PowerMockito.mockStatic(PropertyUtils.class);
        Mockito.when(PropertyUtils.getBoolean(Constants.ENTERPRISE_WECHAT_ENABLE)).thenReturn(true);
        Mockito.when(PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_USER_SEND_MSG)).thenReturn(enterpriseWechatUserSendMsg);
        Mockito.when(PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_TEAM_SEND_MSG)).thenReturn(enterpriseWechatTeamSendMsg);
    }

    @Test
    public void testIsEnable(){
        Boolean weChartEnable = EnterpriseWeChatUtils.isEnable();
        Assert.assertTrue(weChartEnable);
    }
@@ -88,6 +97,7 @@ public class EnterpriseWeChatUtilsTest {

    @Test
    public void tesMakeUserSendMsg1(){

        String sendMsg = EnterpriseWeChatUtils.makeUserSendMsg(enterpriseWechatUsers, enterpriseWechatAgentId, msg);
        Assert.assertTrue(sendMsg.contains(enterpriseWechatUsers));
        Assert.assertTrue(sendMsg.contains(enterpriseWechatAgentId));
+56 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.api.exceptions;

import org.apache.dolphinscheduler.api.enums.Status;


/**
 * service exception
 */
public class ServiceException extends RuntimeException {

    /**
     * code
     */
    private Integer code;

    public ServiceException() {
    }

    public ServiceException(Status status) {
        super(status.getMsg());
        this.code = status.getCode();
    }

    public ServiceException(Integer code,String message) {
        super(message);
        this.code = code;
    }

    public ServiceException(String message) {
        super(message);
    }

    public Integer getCode() {
        return this.code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }
}
 No newline at end of file
+23 −10
Original line number Diff line number Diff line
@@ -65,25 +65,24 @@ public class LoggerService {

    TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);

    if (taskInstance == null){
      return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
    }

    String host = Host.of(taskInstance.getHost()).getIp();
    if(StringUtils.isEmpty(host)){
    if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())){
      return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
    }

    String host = getHost(taskInstance.getHost());

    Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());

    logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);

    String log = logClient.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(),skipLineNum,limit);
    result.setData(log);
    logger.info(log);
    return result;
  }




  /**
   * get log size
   *
@@ -92,10 +91,24 @@ public class LoggerService {
   */
  public byte[] getLogBytes(int taskInstId) {
    TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
    if (taskInstance == null){
      throw new RuntimeException("task instance is null");
    if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())){
      throw new RuntimeException("task instance is null or host is null");
    }
    String host = Host.of(taskInstance.getHost()).getIp();
    String host = getHost(taskInstance.getHost());

    return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath());
  }


  /**
   * get host
   * @param address address
   * @return old version return true ,otherwise return false
   */
  private String getHost(String address){
    if (Host.isOldVersion(address)){
      return address;
    }
    return Host.of(address).getIp();
  }
}
+16 −42
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter;
import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor;
import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
@@ -234,9 +235,6 @@ public class ResourcesService extends BaseService {
        }

        Date now = new Date();



        Resource resource = new Resource(pid,name,fullName,false,desc,file.getOriginalFilename(),loginUser.getId(),type,file.getSize(),now,now);

        try {
@@ -342,7 +340,6 @@ public class ResourcesService extends BaseService {
        String originResourceName = resource.getAlias();
        if (!resource.isDirectory()) {
            //get the file suffix

            String suffix = originResourceName.substring(originResourceName.lastIndexOf("."));

            //if the name without suffix then add it ,else use the origin name
@@ -352,7 +349,7 @@ public class ResourcesService extends BaseService {
        }

        // updateResource data
        List<Integer> childrenResource = listAllChildren(resource);
        List<Integer> childrenResource = listAllChildren(resource,false);
        String oldFullName = resource.getFullName();
        Date now = new Date();

@@ -385,16 +382,16 @@ public class ResourcesService extends BaseService {
            result.setData(resultMap);
        } catch (Exception e) {
            logger.error(Status.UPDATE_RESOURCE_ERROR.getMsg(), e);
            throw new RuntimeException(Status.UPDATE_RESOURCE_ERROR.getMsg());
            throw new ServiceException(Status.UPDATE_RESOURCE_ERROR);
        }
        // if name unchanged, return directly without moving on HDFS
        if (originResourceName.equals(name)) {
            return result;
        }

        // get file hdfs path
        // delete hdfs file by type
        // get the path of origin file in hdfs
        String originHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,originFullName);
        // get the path of dest file in hdfs
        String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName);

        try {
@@ -408,6 +405,7 @@ public class ResourcesService extends BaseService {
        } catch (Exception e) {
            logger.error(MessageFormat.format("hdfs copy {0} -> {1} fail", originHdfsFileName, destHdfsFileName), e);
            putMsg(result,Status.HDFS_COPY_FAIL);
            throw new ServiceException(Status.HDFS_COPY_FAIL);
        }

        return result;
@@ -542,34 +540,6 @@ public class ResourcesService extends BaseService {
        return result;
    }

    /**
     * get all resources
     * @param loginUser     login user
     * @return all resource set
     */
    /*private Set<Resource> getAllResources(User loginUser, ResourceType type) {
        int userId = loginUser.getId();
        boolean listChildren = true;
        if(isAdmin(loginUser)){
            userId = 0;
            listChildren = false;
        }
        List<Resource> resourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal());
        Set<Resource> allResourceList = new HashSet<>(resourceList);
        if (listChildren) {
            Set<Integer> authorizedIds = new HashSet<>();
            List<Resource> authorizedDirecoty = resourceList.stream().filter(t->t.getUserId() != loginUser.getId() && t.isDirectory()).collect(Collectors.toList());
            if (CollectionUtils.isNotEmpty(authorizedDirecoty)) {
                for(Resource resource : authorizedDirecoty){
                    authorizedIds.addAll(listAllChildren(resource));
                }
                List<Resource> childrenResources = resourcesMapper.listResourceByIds(authorizedIds.toArray(new Integer[authorizedIds.size()]));
                allResourceList.addAll(childrenResources);
            }
        }
        return allResourceList;
    }*/

    /**
     * query resource list
     *
@@ -580,8 +550,11 @@ public class ResourcesService extends BaseService {
    public Map<String, Object> queryResourceJarList(User loginUser, ResourceType type) {

        Map<String, Object> result = new HashMap<>(5);

        List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(loginUser.getId(), type.ordinal(),0);
        int userId = loginUser.getId();
        if(isAdmin(loginUser)){
            userId = 0;
        }
        List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
        List<Resource> resources = new ResourceFilter(".jar",new ArrayList<>(allResourceList)).filter();
        Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources);
        result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
@@ -631,7 +604,7 @@ public class ResourcesService extends BaseService {
        Map<Integer, Set<Integer>> resourceProcessMap = ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list);
        Set<Integer> resourceIdSet = resourceProcessMap.keySet();
        // get all children of the resource
        List<Integer> allChildren = listAllChildren(resource);
        List<Integer> allChildren = listAllChildren(resource,true);
        Integer[] needDeleteResourceIdArray = allChildren.toArray(new Integer[allChildren.size()]);

        //if resource type is UDF,need check whether it is bound by UDF functon
@@ -1194,11 +1167,12 @@ public class ResourcesService extends BaseService {
    /**
     * list all children id
     * @param resource    resource
     * @param containSelf whether add self to children list
     * @return all children id
     */
    List<Integer> listAllChildren(Resource resource){
    List<Integer> listAllChildren(Resource resource,boolean containSelf){
        List<Integer> childList = new ArrayList<>();
        if (resource.getId() != -1) {
        if (resource.getId() != -1 && containSelf) {
            childList.add(resource.getId());
        }

+46 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.api.exceptions;

import org.apache.dolphinscheduler.api.enums.Status;
import org.junit.Assert;
import org.junit.Test;

public class ServiceExceptionTest {
    @Test
    public void getCodeTest(){
        ServiceException serviceException = new ServiceException();
        Assert.assertNull(serviceException.getCode());

        serviceException = new ServiceException(Status.ALERT_GROUP_EXIST);
        Assert.assertNotNull(serviceException.getCode());

        serviceException = new ServiceException(10012, "alarm group already exists");
        Assert.assertNotNull(serviceException.getCode());
    }
    @Test
    public void getMessageTest(){
        ServiceException serviceException = new ServiceException();
        Assert.assertNull(serviceException.getMessage());

        serviceException = new ServiceException(Status.ALERT_GROUP_EXIST);
        Assert.assertNotNull(serviceException.getMessage());

        serviceException = new ServiceException(10012, "alarm group already exists");
        Assert.assertNotNull(serviceException.getMessage());
    }
}
Loading