Loading escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java +11 −1 Original line number Diff line number Diff line Loading @@ -191,6 +191,16 @@ public class ExecutorService extends BaseService{ return checkResult; } // checkTenantExists(); Tenant tenant = processDao.getTenantForProcess(processDefinition.getTenantId(), processDefinition.getUserId()); if(tenant == null){ logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ", processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } switch (executeType) { case REPEAT_RUNNING: result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING); Loading Loading @@ -260,7 +270,7 @@ public class ExecutorService extends BaseService{ } break; case RECOVER_SUSPENDED_PROCESS: if (executionStatus.typeIsPause()) { if (executionStatus.typeIsPause()|| executionStatus.typeIsCancel()) { checkResult = true; } default: Loading escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +19 −4 Original line number Diff line number Diff line Loading @@ -38,10 +38,7 @@ import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.placeholder.BusinessTimeUtils; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.mapper.ProcessDefinitionMapper; import cn.escheduler.dao.mapper.ProcessInstanceMapper; import cn.escheduler.dao.mapper.ProjectMapper; import cn.escheduler.dao.mapper.TaskInstanceMapper; import cn.escheduler.dao.mapper.*; import cn.escheduler.dao.model.*; import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; Loading Loading @@ -97,6 +94,9 @@ public class ProcessInstanceService extends BaseDAGService { @Autowired LoggerService loggerService; @Autowired WorkerGroupMapper workerGroupMapper; /** * query process instance by id * Loading @@ -115,6 +115,21 @@ public class ProcessInstanceService extends BaseDAGService { return checkResult; } ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId); String workerGroupName = ""; if(processInstance.getWorkerGroupId() == -1){ workerGroupName = DEFAULT; }else{ WorkerGroup workerGroup = workerGroupMapper.queryById(processInstance.getWorkerGroupId()); if(workerGroup != null){ workerGroupName = DEFAULT; }else{ workerGroupName = workerGroup.getName(); } } processInstance.setWorkerGroupName(workerGroupName); ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); processInstance.setReceivers(processDefinition.getReceivers()); processInstance.setReceiversCc(processDefinition.getReceiversCc()); result.put(Constants.DATA_LIST, processInstance); putMsg(result, Status.SUCCESS); Loading escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java +6 −1 Original line number Diff line number Diff line Loading @@ -369,7 +369,12 @@ public class ResourcesService extends BaseService { public Map<String, Object> queryResourceList(User loginUser, ResourceType type) { Map<String, Object> result = new HashMap<>(5); List<Resource> resourceList = resourcesMapper.queryResourceListAuthored(loginUser.getId(), type.ordinal()); List<Resource> resourceList; if(isAdmin(loginUser)){ resourceList = resourcesMapper.listAllResourceByType(type.ordinal()); }else{ resourceList = resourcesMapper.queryResourceListAuthored(loginUser.getId(), type.ordinal()); } result.put(Constants.DATA_LIST, resourceList); putMsg(result,Status.SUCCESS); Loading escheduler-common/src/main/java/cn/escheduler/common/Constants.java +2 −0 Original line number Diff line number Diff line Loading @@ -488,6 +488,8 @@ public final class Constants { public static final String TASK_RECORD_PWD = "task.record.datasource.password"; public static final String DEFAULT = "Default"; public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; Loading escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +25 −4 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ import cn.escheduler.common.utils.Bytes; import cn.escheduler.common.utils.IpUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; Loading Loading @@ -157,7 +158,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskDetail = list.get(i); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); //向前版本兼容 //向前版本兼ProcessInstanceService容 if(taskDetailArrs.length >= 4){ //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} Loading @@ -166,9 +167,8 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskHosts = taskDetailArrs[4]; //task can assign to any worker host if equals default ip value of worker server if(!taskHosts.equals(Constants.DEFAULT_WORKER_ID)){ if(!taskHosts.equals(String.valueOf(Constants.DEFAULT_WORKER_ID))){ String[] taskHostsArr = taskHosts.split(Constants.COMMA); if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){ continue; } Loading Loading @@ -210,12 +210,33 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { while(iterator.hasNext()){ if(j++ < tasksNum){ String task = iterator.next(); taskslist.add(task); taskslist.add(getOriginTaskFormat(task)); } } return taskslist; } /** * format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} * processInstanceId and task id need to be convert to int. * @param formatTask * @return */ private String getOriginTaskFormat(String formatTask){ String[] taskArray = formatTask.split(Constants.UNDERLINE); int processInstanceId = Integer.parseInt(taskArray[1]); int taskId = Integer.parseInt(taskArray[3]); String suffix = ""; for(int index =4; index < taskArray.length; index++){ suffix += taskArray[index] + Constants.UNDERLINE; } String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[3], taskId); if(StringUtils.isNotEmpty(suffix)){ destTask += Constants.UNDERLINE + suffix; } return destTask; } @Override public void removeNode(String key, String nodeValue){ Loading Loading
escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java +11 −1 Original line number Diff line number Diff line Loading @@ -191,6 +191,16 @@ public class ExecutorService extends BaseService{ return checkResult; } // checkTenantExists(); Tenant tenant = processDao.getTenantForProcess(processDefinition.getTenantId(), processDefinition.getUserId()); if(tenant == null){ logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ", processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } switch (executeType) { case REPEAT_RUNNING: result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING); Loading Loading @@ -260,7 +270,7 @@ public class ExecutorService extends BaseService{ } break; case RECOVER_SUSPENDED_PROCESS: if (executionStatus.typeIsPause()) { if (executionStatus.typeIsPause()|| executionStatus.typeIsCancel()) { checkResult = true; } default: Loading
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +19 −4 Original line number Diff line number Diff line Loading @@ -38,10 +38,7 @@ import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.placeholder.BusinessTimeUtils; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.mapper.ProcessDefinitionMapper; import cn.escheduler.dao.mapper.ProcessInstanceMapper; import cn.escheduler.dao.mapper.ProjectMapper; import cn.escheduler.dao.mapper.TaskInstanceMapper; import cn.escheduler.dao.mapper.*; import cn.escheduler.dao.model.*; import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; Loading Loading @@ -97,6 +94,9 @@ public class ProcessInstanceService extends BaseDAGService { @Autowired LoggerService loggerService; @Autowired WorkerGroupMapper workerGroupMapper; /** * query process instance by id * Loading @@ -115,6 +115,21 @@ public class ProcessInstanceService extends BaseDAGService { return checkResult; } ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId); String workerGroupName = ""; if(processInstance.getWorkerGroupId() == -1){ workerGroupName = DEFAULT; }else{ WorkerGroup workerGroup = workerGroupMapper.queryById(processInstance.getWorkerGroupId()); if(workerGroup != null){ workerGroupName = DEFAULT; }else{ workerGroupName = workerGroup.getName(); } } processInstance.setWorkerGroupName(workerGroupName); ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); processInstance.setReceivers(processDefinition.getReceivers()); processInstance.setReceiversCc(processDefinition.getReceiversCc()); result.put(Constants.DATA_LIST, processInstance); putMsg(result, Status.SUCCESS); Loading
escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java +6 −1 Original line number Diff line number Diff line Loading @@ -369,7 +369,12 @@ public class ResourcesService extends BaseService { public Map<String, Object> queryResourceList(User loginUser, ResourceType type) { Map<String, Object> result = new HashMap<>(5); List<Resource> resourceList = resourcesMapper.queryResourceListAuthored(loginUser.getId(), type.ordinal()); List<Resource> resourceList; if(isAdmin(loginUser)){ resourceList = resourcesMapper.listAllResourceByType(type.ordinal()); }else{ resourceList = resourcesMapper.queryResourceListAuthored(loginUser.getId(), type.ordinal()); } result.put(Constants.DATA_LIST, resourceList); putMsg(result,Status.SUCCESS); Loading
escheduler-common/src/main/java/cn/escheduler/common/Constants.java +2 −0 Original line number Diff line number Diff line Loading @@ -488,6 +488,8 @@ public final class Constants { public static final String TASK_RECORD_PWD = "task.record.datasource.password"; public static final String DEFAULT = "Default"; public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; Loading
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +25 −4 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ import cn.escheduler.common.utils.Bytes; import cn.escheduler.common.utils.IpUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; Loading Loading @@ -157,7 +158,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskDetail = list.get(i); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); //向前版本兼容 //向前版本兼ProcessInstanceService容 if(taskDetailArrs.length >= 4){ //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} Loading @@ -166,9 +167,8 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskHosts = taskDetailArrs[4]; //task can assign to any worker host if equals default ip value of worker server if(!taskHosts.equals(Constants.DEFAULT_WORKER_ID)){ if(!taskHosts.equals(String.valueOf(Constants.DEFAULT_WORKER_ID))){ String[] taskHostsArr = taskHosts.split(Constants.COMMA); if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){ continue; } Loading Loading @@ -210,12 +210,33 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { while(iterator.hasNext()){ if(j++ < tasksNum){ String task = iterator.next(); taskslist.add(task); taskslist.add(getOriginTaskFormat(task)); } } return taskslist; } /** * format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} * processInstanceId and task id need to be convert to int. * @param formatTask * @return */ private String getOriginTaskFormat(String formatTask){ String[] taskArray = formatTask.split(Constants.UNDERLINE); int processInstanceId = Integer.parseInt(taskArray[1]); int taskId = Integer.parseInt(taskArray[3]); String suffix = ""; for(int index =4; index < taskArray.length; index++){ suffix += taskArray[index] + Constants.UNDERLINE; } String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[3], taskId); if(StringUtils.isNotEmpty(suffix)){ destTask += Constants.UNDERLINE + suffix; } return destTask; } @Override public void removeNode(String key, String nodeValue){ Loading