Unverified Commit 733acdfd authored by lgcareer's avatar lgcareer Committed by GitHub
Browse files

It is necessary to check whether the resource is valid (#2399)



* init full_name in dml of version 1.2.2

* redefine request parameter resourceIds

* redefine request parameter resourceIds

* Actually grant resource file if choose the directory

* To cancel authorized resource need check whether it is used by the process definition which is online

* If release the process definition online,It is necessary to check whether resource is valid

* update ResourceServiceTest and ResourceMapperTest

* add batchUpdateResourceTest

* add getHdfsFileNameTest and getHdfsResourceFileNameTest

* update ResourceServiceTest and ResourceMapperTest

* extract getResourceProcessMap to ResourceProcessDefinitonUtils

Co-authored-by: default avatardailidong <dailidong66@gmail.com>
parent 4c90f04b
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -190,6 +190,7 @@ public enum Status {
    UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}","udf函数绑定了资源文件[{0}]"),
    RESOURCE_IS_USED(20014, "resource file is used by process definition","资源文件被上线的流程定义使用了"),
    PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist","父资源文件不存在"),
    RESOURCE_NOT_EXIST_OR_NO_PERMISSION(20016, "resource not exist or no permission,please view the task node and remove error resource","请检查任务节点并移除无权限或者已删除的资源"),


    USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),
+17 −0
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,6 +144,7 @@ public class ProcessDefinitionService extends BaseDAGService {
        processDefine.setTimeout(processData.getTimeout());
        processDefine.setTenantId(processData.getTenantId());
        processDefine.setModifyBy(loginUser.getUserName());
        processDefine.setResourceIds(getResourceIds(processData));

        //custom global params
        List<Property> globalParamsList = processData.getGlobalParams();
@@ -333,6 +335,7 @@ public class ProcessDefinitionService extends BaseDAGService {
        processDefine.setTimeout(processData.getTimeout());
        processDefine.setTenantId(processData.getTenantId());
        processDefine.setModifyBy(loginUser.getUserName());
        processDefine.setResourceIds(getResourceIds(processData));

        //custom global params
        List<Property> globalParamsList = new ArrayList<>();
@@ -476,6 +479,20 @@ public class ProcessDefinitionService extends BaseDAGService {

        switch (state) {
            case ONLINE:
                // To check resources whether they are already cancel authorized or deleted
                String resourceIds = processDefinition.getResourceIds();
                if (StringUtils.isNotBlank(resourceIds)) {
                    Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new);
                    PermissionCheck<Integer> permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resourceIdArray,loginUser.getId(),logger);
                    try {
                        permissionCheck.checkPermission();
                    } catch (Exception e) {
                        logger.error(e.getMessage(),e);
                        putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, "releaseState");
                        return result;
                    }
                }

                processDefinition.setReleaseState(state);
                processDefineMapper.updateById(processDefinition);
                break;
+48 −51
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -176,6 +177,21 @@ public class ResourcesService extends BaseService {
            putMsg(result, Status.HDFS_NOT_STARTUP);
            return result;
        }

        if (pid != -1) {
            Resource parentResource = resourcesMapper.selectById(pid);

            if (parentResource == null) {
                putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
                return result;
            }

            if (!hasPerm(loginUser, parentResource.getUserId())) {
                putMsg(result, Status.USER_NO_OPERATION_PERM);
                return result;
            }
        }

        // file is empty
        if (file.isEmpty()) {
            logger.error("file is empty: {}", file.getOriginalFilename());
@@ -416,6 +432,14 @@ public class ResourcesService extends BaseService {
        if (isAdmin(loginUser)) {
            userId= 0;
        }
        if (direcotryId != -1) {
            Resource directory = resourcesMapper.selectById(direcotryId);
            if (directory == null) {
                putMsg(result, Status.RESOURCE_NOT_EXIST);
                return result;
            }
        }

        IPage<Resource> resourceIPage = resourcesMapper.queryResourcePaging(page,
                userId,direcotryId, type.ordinal(), searchVal);
        PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
@@ -505,8 +529,12 @@ public class ResourcesService extends BaseService {

        Map<String, Object> result = new HashMap<>(5);

        Set<Resource> allResourceList = getAllResources(loginUser, type);
        Visitor resourceTreeVisitor = new ResourceTreeVisitor(new ArrayList<>(allResourceList));
        int userId = loginUser.getId();
        if(isAdmin(loginUser)){
            userId = 0;
        }
        List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
        Visitor resourceTreeVisitor = new ResourceTreeVisitor(allResourceList);
        //JSONArray jsonArray = JSON.parseArray(JSON.toJSONString(resourceTreeVisitor.visit().getChildren(), SerializerFeature.SortField));
        result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
        putMsg(result,Status.SUCCESS);
@@ -519,7 +547,7 @@ public class ResourcesService extends BaseService {
     * @param loginUser     login user
     * @return all resource set
     */
    private Set<Resource> getAllResources(User loginUser, ResourceType type) {
    /*private Set<Resource> getAllResources(User loginUser, ResourceType type) {
        int userId = loginUser.getId();
        boolean listChildren = true;
        if(isAdmin(loginUser)){
@@ -540,7 +568,7 @@ public class ResourcesService extends BaseService {
            }
        }
        return allResourceList;
    }
    }*/

    /**
     * query resource list
@@ -553,7 +581,7 @@ public class ResourcesService extends BaseService {

        Map<String, Object> result = new HashMap<>(5);

        Set<Resource> allResourceList = getAllResources(loginUser, type);
        List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(loginUser.getId(), type.ordinal(),0);
        List<Resource> resources = new ResourceFilter(".jar",new ArrayList<>(allResourceList)).filter();
        Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources);
        result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
@@ -592,15 +620,6 @@ public class ResourcesService extends BaseService {
            putMsg(result, Status.USER_NO_OPERATION_PERM);
            return result;
        }
        //if resource type is UDF,need check whether it is bound by UDF functon
        if (resource.getType() == (ResourceType.UDF)) {
            List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(new int[]{resourceId});
            if (CollectionUtils.isNotEmpty(udfFuncs)) {
                logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString());
                putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName());
                return result;
            }
        }

        String tenantCode = getTenantCode(resource.getUserId(),result);
        if (StringUtils.isEmpty(tenantCode)){
@@ -608,10 +627,22 @@ public class ResourcesService extends BaseService {
        }

        // get all resource id of process definitions those is released
        Map<Integer, Set<Integer>> resourceProcessMap = getResourceProcessMap();
        List<Map<String, Object>> list = processDefinitionMapper.listResources();
        Map<Integer, Set<Integer>> resourceProcessMap = ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list);
        Set<Integer> resourceIdSet = resourceProcessMap.keySet();
        // get all children of the resource
        List<Integer> allChildren = listAllChildren(resource);
        Integer[] needDeleteResourceIdArray = allChildren.toArray(new Integer[allChildren.size()]);

        //if resource type is UDF,need check whether it is bound by UDF functon
        if (resource.getType() == (ResourceType.UDF)) {
            List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(needDeleteResourceIdArray);
            if (CollectionUtils.isNotEmpty(udfFuncs)) {
                logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString());
                putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName());
                return result;
            }
        }

        if (resourceIdSet.contains(resource.getPid())) {
            logger.error("can't be deleted,because it is used of process definition");
@@ -632,8 +663,8 @@ public class ResourcesService extends BaseService {
        String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName());

        //delete data in database
        resourcesMapper.deleteIds(allChildren.toArray(new Integer[allChildren.size()]));
        resourceUserMapper.deleteResourceUser(0, resourceId);
        resourcesMapper.deleteIds(needDeleteResourceIdArray);
        resourceUserMapper.deleteResourceUserArray(0, needDeleteResourceIdArray);

        //delete file on hdfs
        HadoopUtils.getInstance().delete(hdfsFilename, true);
@@ -1191,38 +1222,4 @@ public class ResourcesService extends BaseService {
        }
    }

    /**
     * get resource process map key is resource id,value is the set of process definition
     * @return resource process definition map
     */
    private Map<Integer,Set<Integer>> getResourceProcessMap(){
        Map<Integer, String> map = new HashMap<>();
        Map<Integer, Set<Integer>> result = new HashMap<>();
        List<Map<String, Object>> list = processDefinitionMapper.listResources();
        if (CollectionUtils.isNotEmpty(list)) {
            for (Map<String, Object> tempMap : list) {

                map.put((Integer) tempMap.get("id"), (String)tempMap.get("resource_ids"));
            }
        }

        for (Map.Entry<Integer, String> entry : map.entrySet()) {
            Integer mapKey = entry.getKey();
            String[] arr = entry.getValue().split(",");
            Set<Integer> mapValues = Arrays.stream(arr).map(Integer::parseInt).collect(Collectors.toSet());
            for (Integer value : mapValues) {
                if (result.containsKey(value)) {
                    Set<Integer> set = result.get(value);
                    set.add(mapKey);
                    result.put(value, set);
                } else {
                    Set<Integer> set = new HashSet<>();
                    set.add(mapKey);
                    result.put(value, set);
                }
            }
        }
        return result;
    }

}
+60 −11
Original line number Diff line number Diff line
@@ -16,6 +16,8 @@
 */
package org.apache.dolphinscheduler.api.service;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -23,15 +25,10 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.EncryptionUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -39,6 +36,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;
import java.util.stream.Collectors;

/**
 * user service
@@ -72,6 +70,9 @@ public class UsersService extends BaseService {
    @Autowired
    private AlertGroupMapper alertGroupMapper;

    @Autowired
    private ProcessDefinitionMapper processDefinitionMapper;


    /**
     * create user, only system admin have permission
@@ -483,23 +484,71 @@ public class UsersService extends BaseService {
            return result;
        }

        String[] resourceFullIdArr = resourceIds.split(",");
        // need authorize resource id set
        Set<Integer> needAuthorizeResIds = new HashSet();
        for (String resourceFullId : resourceFullIdArr) {
            String[] resourceIdArr = resourceFullId.split("-");
            for (int i=0;i<=resourceIdArr.length-1;i++) {
                int resourceIdValue = Integer.parseInt(resourceIdArr[i]);
                needAuthorizeResIds.add(resourceIdValue);
            }
        }

        //get the authorized resource id list by user id
        List<Resource> oldAuthorizedRes = resourceMapper.queryAuthorizedResourceList(userId);
        //if resource type is UDF,need check whether it is bound by UDF functon
        Set<Integer> oldAuthorizedResIds = oldAuthorizedRes.stream().map(t -> t.getId()).collect(Collectors.toSet());

        //get the unauthorized resource id list
        oldAuthorizedResIds.removeAll(needAuthorizeResIds);

        if (CollectionUtils.isNotEmpty(oldAuthorizedResIds)) {

            // get all resource id of process definitions those is released
            List<Map<String, Object>> list = processDefinitionMapper.listResources();
            Map<Integer, Set<Integer>> resourceProcessMap = ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list);
            Set<Integer> resourceIdSet = resourceProcessMap.keySet();

            resourceIdSet.retainAll(oldAuthorizedResIds);
            if (CollectionUtils.isNotEmpty(resourceIdSet)) {
                logger.error("can't be deleted,because it is used of process definition");
                for (Integer resId : resourceIdSet) {
                    logger.error("resource id:{} is used of process definition {}",resId,resourceProcessMap.get(resId));
                }
                putMsg(result, Status.RESOURCE_IS_USED);
                return result;
            }

        }

        resourcesUserMapper.deleteResourceUser(userId, 0);

        if (check(result, StringUtils.isEmpty(resourceIds), Status.SUCCESS)) {
            return result;
        }

        String[] resourcesIdArr = resourceIds.split(",");
        for (int resourceIdValue : needAuthorizeResIds) {
            Resource resource = resourceMapper.selectById(resourceIdValue);
            if (resource == null) {
                putMsg(result, Status.RESOURCE_NOT_EXIST);
                return result;
            }

        for (String resourceId : resourcesIdArr) {
            Date now = new Date();
            ResourcesUser resourcesUser = new ResourcesUser();
            resourcesUser.setUserId(userId);
            resourcesUser.setResourcesId(Integer.parseInt(resourceId));
            resourcesUser.setPerm(7);
            resourcesUser.setResourcesId(resourceIdValue);
            if (resource.isDirectory()) {
                resourcesUser.setPerm(Constants.AUTHORIZE_READABLE_PERM);
            }else{
                resourcesUser.setPerm(Constants.AUTHORIZE_WRITABLE_PERM);
            }

            resourcesUser.setCreateTime(now);
            resourcesUser.setUpdateTime(now);
            resourcesUserMapper.insert(resourcesUser);

        }

        putMsg(result, Status.SUCCESS);
+1 −1
Original line number Diff line number Diff line
@@ -242,7 +242,7 @@ public class ResourcesServiceTest {
        User loginUser = new User();
        loginUser.setId(0);
        loginUser.setUserType(UserType.ADMIN_USER);
        Mockito.when(resourcesMapper.queryResourceListAuthored(0, 0)).thenReturn(getResourceList());
        Mockito.when(resourcesMapper.queryResourceListAuthored(0, 0,0)).thenReturn(getResourceList());
        Map<String, Object> result = resourcesService.queryResourceList(loginUser, ResourceType.FILE);
        logger.info(result.toString());
        Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
Loading