Loading dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +31 −5 Original line number Diff line number Diff line Loading @@ -26,12 +26,8 @@ import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.queue.ITaskQueue; import org.apache.dolphinscheduler.common.queue.TaskQueueFactory; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.dao.utils.cron.CronUtils; Loading @@ -45,6 +41,7 @@ import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; import static org.apache.dolphinscheduler.common.Constants.*; /** Loading Loading @@ -1763,5 +1760,34 @@ public class ProcessDao { return projectIdList; } /** * list unauthorized resource * @param userId user id * @param resNames resource name * @return unauthorized resource list */ public List<String> listUnauthorizedResource(int userId,String[] resNames){ List<String> resultList = new ArrayList<String>(); List<String> originResList = Arrays.asList(resNames); List<Resource> authorizedResourceList = resourceMapper.listAuthorizedResource(userId, resNames); if(CollectionUtils.isNotEmpty(authorizedResourceList)){ List<String> authorizedResNames = authorizedResourceList.stream().map(t -> t.getAlias()).collect(toList()); resultList = originResList.stream().filter(item -> !authorizedResNames.contains(item)).collect(toList()); } return resultList; } /** * get user by user id * @param userId user id * @return User */ public User getUserById(int userId){ return userMapper.queryDetailsById(userId); } } dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java +8 −0 Original line number Diff line number Diff line Loading @@ -83,4 +83,12 @@ public interface ResourceMapper extends BaseMapper<Resource> { * @return tenant code */ String queryTenantCodeByResourceName(@Param("resName") String resName); /** * list unauthorized resource * @param userId userId * @param resNames resource names * @return resource list */ List<Resource> listAuthorizedResource(@Param("userId") int userId,@Param("resNames")String[] resNames); } dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml +14 −0 Original line number Diff line number Diff line Loading @@ -73,4 +73,18 @@ where t.id = u.tenant_id and u.id = res.user_id and res.type=0 and res.alias= #{resName} </select> <select id="listAuthorizedResource" resultType="org.apache.dolphinscheduler.dao.entity.Resource"> select * from t_ds_resources where type=0 and id in (select resources_id from t_ds_relation_resources_user where user_id=#{userId} union select id as resources_id from t_ds_resources where user_id=#{userId}) <if test="resNames != null and resNames != ''"> and alias in <foreach collection="resNames" item="i" open="(" close=")" separator=","> #{i} </foreach> </if> </select> </mapper> dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java +117 −2 Original line number Diff line number Diff line Loading @@ -17,22 +17,36 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.dao.entity.*; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.ResourcesUser; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; import java.util.Arrays; import java.util.Date; import java.util.List; import static java.util.stream.Collectors.toList; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; @RunWith(SpringRunner.class) @SpringBootTest @Transactional @Rollback(true) public class ResourceMapperTest { @Autowired Loading Loading @@ -61,6 +75,76 @@ public class ResourceMapperTest { return resource; } /** * create resource by user * @param user user * @return Resource */ private Resource createResource(User user){ //insertOne Resource resource = new Resource(); resource.setAlias(String.format("ut resource %s",user.getUserName())); resource.setType(ResourceType.FILE); resource.setUserId(user.getId()); resourceMapper.insert(resource); return resource; } /** * create user * @return User */ private User createGeneralUser(String userName){ User user = new User(); user.setUserName(userName); user.setUserPassword("1"); user.setEmail("xx@123.com"); user.setUserType(UserType.GENERAL_USER); user.setCreateTime(new Date()); user.setTenantId(1); user.setUpdateTime(new Date()); userMapper.insert(user); return user; } /** * create admin user * @return User */ private User createAdminUser(){ User user = new User(); user.setUserName("admin1"); user.setUserPassword("1"); user.setEmail("xx@123.com"); user.setUserType(UserType.ADMIN_USER); user.setCreateTime(new Date()); user.setTenantId(1); user.setUpdateTime(new Date()); userMapper.insert(user); return user; } /** * create resource user * @return ResourcesUser */ private ResourcesUser createResourcesUser(Resource resource,User user){ //insertOne ResourcesUser resourcesUser = new ResourcesUser(); resourcesUser.setCreateTime(new Date()); resourcesUser.setUpdateTime(new Date()); resourcesUser.setUserId(user.getId()); resourcesUser.setResourcesId(resource.getId()); resourceUserMapper.insert(resourcesUser); return resourcesUser; } @Test public void testInsert(){ Resource resource = insertOne(); assertNotNull(resource.getId()); assertThat(resource.getId(),greaterThan(0)); } /** * test update */ Loading Loading @@ -230,4 +314,35 @@ public class ResourceMapperTest { resourceMapper.deleteById(resource.getId()); } @Test public void testListAuthorizedResource(){ // create a general user User generalUser = createGeneralUser("user1"); User generalUser1 = createGeneralUser("user2"); User adminUser = createAdminUser(); // create one resource Resource resource = createResource(generalUser); Resource unauthorizedResource = createResource(generalUser1); // need download resources String[] resNames = new String[]{resource.getAlias(), unauthorizedResource.getAlias()}; List<Resource> resources = resourceMapper.listAuthorizedResource(generalUser.getId(), resNames); Assert.assertEquals(generalUser.getId(),resource.getUserId()); Assert.assertFalse(resources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); // authorize object unauthorizedResource to generalUser createResourcesUser(unauthorizedResource,generalUser); List<Resource> authorizedResources = resourceMapper.listAuthorizedResource(generalUser.getId(), resNames); Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); // admin user List<Resource> adminAuthorizedResources = resourceMapper.listAuthorizedResource(adminUser.getId(), resNames); Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); } } No newline at end of file dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +37 −6 Original line number Diff line number Diff line Loading @@ -23,16 +23,19 @@ import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.server.utils.LoggerUtils; import org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; Loading Loading @@ -94,11 +97,15 @@ public class TaskScheduleThread implements Runnable { // task node TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); // get resource files List<String> resourceFiles = createProjectResFiles(taskNode); // copy hdfs/minio file to local if (checkDownloadPermission(processDao,resourceFiles)) { copyHdfsToLocal(processDao, taskInstance.getExecutePath(), createProjectResFiles(taskNode), resourceFiles, logger); } // get process instance according to tak instance ProcessInstance processInstance = taskInstance.getProcessInstance(); Loading Loading @@ -205,7 +212,7 @@ public class TaskScheduleThread implements Runnable { /** * get task log path * @return * @return log path */ private String getTaskLogPath() { String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) Loading Loading @@ -321,4 +328,28 @@ public class TaskScheduleThread implements Runnable { } } } /** * check download resource permission * @param processDao process dao * @param projectRes project resources * @return if has download permission return true else false */ private boolean checkDownloadPermission(ProcessDao processDao, List<String> projectRes) { if(CollectionUtils.isNotEmpty(projectRes)){ // get user id int userId = taskInstance.getProcessInstance().getExecutorId(); // get user type in order to judge whether the user is admin User user = processDao.getUserById(userId); if (user.getUserType() != UserType.ADMIN_USER){ List<String> unauthorizedResource = processDao.listUnauthorizedResource(userId, (String[]) projectRes.toArray()); // if exist unauthorized resource if(CollectionUtils.isNotEmpty(unauthorizedResource)){ logger.error("user {} didn't has download permission of resource file: {}", user.getUserName(), unauthorizedResource.toString()); throw new RuntimeException(String.format("user %s didn't has download permission of resource file %s", user.getUserName(), unauthorizedResource.get(0))); } } } return true; } } No newline at end of file Loading
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +31 −5 Original line number Diff line number Diff line Loading @@ -26,12 +26,8 @@ import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.queue.ITaskQueue; import org.apache.dolphinscheduler.common.queue.TaskQueueFactory; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.dao.utils.cron.CronUtils; Loading @@ -45,6 +41,7 @@ import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; import static org.apache.dolphinscheduler.common.Constants.*; /** Loading Loading @@ -1763,5 +1760,34 @@ public class ProcessDao { return projectIdList; } /** * list unauthorized resource * @param userId user id * @param resNames resource name * @return unauthorized resource list */ public List<String> listUnauthorizedResource(int userId,String[] resNames){ List<String> resultList = new ArrayList<String>(); List<String> originResList = Arrays.asList(resNames); List<Resource> authorizedResourceList = resourceMapper.listAuthorizedResource(userId, resNames); if(CollectionUtils.isNotEmpty(authorizedResourceList)){ List<String> authorizedResNames = authorizedResourceList.stream().map(t -> t.getAlias()).collect(toList()); resultList = originResList.stream().filter(item -> !authorizedResNames.contains(item)).collect(toList()); } return resultList; } /** * get user by user id * @param userId user id * @return User */ public User getUserById(int userId){ return userMapper.queryDetailsById(userId); } }
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java +8 −0 Original line number Diff line number Diff line Loading @@ -83,4 +83,12 @@ public interface ResourceMapper extends BaseMapper<Resource> { * @return tenant code */ String queryTenantCodeByResourceName(@Param("resName") String resName); /** * list unauthorized resource * @param userId userId * @param resNames resource names * @return resource list */ List<Resource> listAuthorizedResource(@Param("userId") int userId,@Param("resNames")String[] resNames); }
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml +14 −0 Original line number Diff line number Diff line Loading @@ -73,4 +73,18 @@ where t.id = u.tenant_id and u.id = res.user_id and res.type=0 and res.alias= #{resName} </select> <select id="listAuthorizedResource" resultType="org.apache.dolphinscheduler.dao.entity.Resource"> select * from t_ds_resources where type=0 and id in (select resources_id from t_ds_relation_resources_user where user_id=#{userId} union select id as resources_id from t_ds_resources where user_id=#{userId}) <if test="resNames != null and resNames != ''"> and alias in <foreach collection="resNames" item="i" open="(" close=")" separator=","> #{i} </foreach> </if> </select> </mapper>
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java +117 −2 Original line number Diff line number Diff line Loading @@ -17,22 +17,36 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.dao.entity.*; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.ResourcesUser; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; import java.util.Arrays; import java.util.Date; import java.util.List; import static java.util.stream.Collectors.toList; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; @RunWith(SpringRunner.class) @SpringBootTest @Transactional @Rollback(true) public class ResourceMapperTest { @Autowired Loading Loading @@ -61,6 +75,76 @@ public class ResourceMapperTest { return resource; } /** * create resource by user * @param user user * @return Resource */ private Resource createResource(User user){ //insertOne Resource resource = new Resource(); resource.setAlias(String.format("ut resource %s",user.getUserName())); resource.setType(ResourceType.FILE); resource.setUserId(user.getId()); resourceMapper.insert(resource); return resource; } /** * create user * @return User */ private User createGeneralUser(String userName){ User user = new User(); user.setUserName(userName); user.setUserPassword("1"); user.setEmail("xx@123.com"); user.setUserType(UserType.GENERAL_USER); user.setCreateTime(new Date()); user.setTenantId(1); user.setUpdateTime(new Date()); userMapper.insert(user); return user; } /** * create admin user * @return User */ private User createAdminUser(){ User user = new User(); user.setUserName("admin1"); user.setUserPassword("1"); user.setEmail("xx@123.com"); user.setUserType(UserType.ADMIN_USER); user.setCreateTime(new Date()); user.setTenantId(1); user.setUpdateTime(new Date()); userMapper.insert(user); return user; } /** * create resource user * @return ResourcesUser */ private ResourcesUser createResourcesUser(Resource resource,User user){ //insertOne ResourcesUser resourcesUser = new ResourcesUser(); resourcesUser.setCreateTime(new Date()); resourcesUser.setUpdateTime(new Date()); resourcesUser.setUserId(user.getId()); resourcesUser.setResourcesId(resource.getId()); resourceUserMapper.insert(resourcesUser); return resourcesUser; } @Test public void testInsert(){ Resource resource = insertOne(); assertNotNull(resource.getId()); assertThat(resource.getId(),greaterThan(0)); } /** * test update */ Loading Loading @@ -230,4 +314,35 @@ public class ResourceMapperTest { resourceMapper.deleteById(resource.getId()); } @Test public void testListAuthorizedResource(){ // create a general user User generalUser = createGeneralUser("user1"); User generalUser1 = createGeneralUser("user2"); User adminUser = createAdminUser(); // create one resource Resource resource = createResource(generalUser); Resource unauthorizedResource = createResource(generalUser1); // need download resources String[] resNames = new String[]{resource.getAlias(), unauthorizedResource.getAlias()}; List<Resource> resources = resourceMapper.listAuthorizedResource(generalUser.getId(), resNames); Assert.assertEquals(generalUser.getId(),resource.getUserId()); Assert.assertFalse(resources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); // authorize object unauthorizedResource to generalUser createResourcesUser(unauthorizedResource,generalUser); List<Resource> authorizedResources = resourceMapper.listAuthorizedResource(generalUser.getId(), resNames); Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); // admin user List<Resource> adminAuthorizedResources = resourceMapper.listAuthorizedResource(adminUser.getId(), resNames); Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); } } No newline at end of file
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +37 −6 Original line number Diff line number Diff line Loading @@ -23,16 +23,19 @@ import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.server.utils.LoggerUtils; import org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; Loading Loading @@ -94,11 +97,15 @@ public class TaskScheduleThread implements Runnable { // task node TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); // get resource files List<String> resourceFiles = createProjectResFiles(taskNode); // copy hdfs/minio file to local if (checkDownloadPermission(processDao,resourceFiles)) { copyHdfsToLocal(processDao, taskInstance.getExecutePath(), createProjectResFiles(taskNode), resourceFiles, logger); } // get process instance according to tak instance ProcessInstance processInstance = taskInstance.getProcessInstance(); Loading Loading @@ -205,7 +212,7 @@ public class TaskScheduleThread implements Runnable { /** * get task log path * @return * @return log path */ private String getTaskLogPath() { String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) Loading Loading @@ -321,4 +328,28 @@ public class TaskScheduleThread implements Runnable { } } } /** * check download resource permission * @param processDao process dao * @param projectRes project resources * @return if has download permission return true else false */ private boolean checkDownloadPermission(ProcessDao processDao, List<String> projectRes) { if(CollectionUtils.isNotEmpty(projectRes)){ // get user id int userId = taskInstance.getProcessInstance().getExecutorId(); // get user type in order to judge whether the user is admin User user = processDao.getUserById(userId); if (user.getUserType() != UserType.ADMIN_USER){ List<String> unauthorizedResource = processDao.listUnauthorizedResource(userId, (String[]) projectRes.toArray()); // if exist unauthorized resource if(CollectionUtils.isNotEmpty(unauthorizedResource)){ logger.error("user {} didn't has download permission of resource file: {}", user.getUserName(), unauthorizedResource.toString()); throw new RuntimeException(String.format("user %s didn't has download permission of resource file %s", user.getUserName(), unauthorizedResource.get(0))); } } } return true; } } No newline at end of file