Loading escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java +1 −0 Original line number Diff line number Diff line Loading @@ -212,6 +212,7 @@ public enum Status { DELETE_SCHEDULE_CRON_BY_ID_ERROR(50024,"delete schedule by id error"), BATCH_DELETE_PROCESS_DEFINE_ERROR(50025,"batch delete process definition error"), BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026,"batch delete process definition by ids {0} error"), TENANT_NOT_SUITABLE(50027,"there is not any tenant suitable, please choose a tenant available."), HDFS_NOT_STARTUP(60001,"hdfs not startup"), HDFS_TERANT_RESOURCES_FILE_EXISTS(60002,"resource file exists,please delete resource first"), Loading escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java +24 −7 Original line number Diff line number Diff line Loading @@ -110,6 +110,13 @@ public class ExecutorService extends BaseService{ return result; } if (!checkTenantSuitable(processDefinition)){ logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ", processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.TENANT_NOT_SUITABLE); return result; } /** * create command */ Loading Loading @@ -190,15 +197,10 @@ public class ExecutorService extends BaseService{ if (status != Status.SUCCESS) { return checkResult; } // checkTenantExists(); Tenant tenant = processDao.getTenantForProcess(processDefinition.getTenantId(), processDefinition.getUserId()); if(tenant == null){ if (!checkTenantSuitable(processDefinition)){ logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ", processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; putMsg(result, Status.TENANT_NOT_SUITABLE); } switch (executeType) { Loading Loading @@ -240,6 +242,21 @@ public class ExecutorService extends BaseService{ return result; } /** * check tenant suitable * @param processDefinition * @return */ private boolean checkTenantSuitable(ProcessDefinition processDefinition) { // checkTenantExists(); Tenant tenant = processDao.getTenantForProcess(processDefinition.getTenantId(), processDefinition.getUserId()); if(tenant == null){ return false; } return true; } /** * Check the state of process instance and the type of operation match * Loading escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +7 −4 Original line number Diff line number Diff line Loading @@ -228,12 +228,12 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { int j = 0; List<String> taskslist = new ArrayList<>(tasksNum); while(iterator.hasNext()){ if(j++ < tasksNum){ if(j++ >= tasksNum){ break; } String task = iterator.next(); taskslist.add(getOriginTaskFormat(task)); } } return taskslist; } Loading @@ -245,6 +245,9 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { */ private String getOriginTaskFormat(String formatTask){ String[] taskArray = formatTask.split(Constants.UNDERLINE); if(taskArray.length< 4){ return formatTask; } int processInstanceId = Integer.parseInt(taskArray[1]); int taskId = Integer.parseInt(taskArray[3]); Loading escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +5 −0 Original line number Diff line number Diff line Loading @@ -210,6 +210,11 @@ public class FetchTaskThread implements Runnable{ Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), processDefine.getUserId()); if(tenant == null){ logger.error("cannot find suitable tenant for the task:{}, process instance tenant:{}, process definition tenant:{}", taskInstance.getName(),processInstance.getTenantId(), processDefine.getTenantId()); continue; } // check and create Linux users FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, Loading Loading
escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java +1 −0 Original line number Diff line number Diff line Loading @@ -212,6 +212,7 @@ public enum Status { DELETE_SCHEDULE_CRON_BY_ID_ERROR(50024,"delete schedule by id error"), BATCH_DELETE_PROCESS_DEFINE_ERROR(50025,"batch delete process definition error"), BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026,"batch delete process definition by ids {0} error"), TENANT_NOT_SUITABLE(50027,"there is not any tenant suitable, please choose a tenant available."), HDFS_NOT_STARTUP(60001,"hdfs not startup"), HDFS_TERANT_RESOURCES_FILE_EXISTS(60002,"resource file exists,please delete resource first"), Loading
escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java +24 −7 Original line number Diff line number Diff line Loading @@ -110,6 +110,13 @@ public class ExecutorService extends BaseService{ return result; } if (!checkTenantSuitable(processDefinition)){ logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ", processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.TENANT_NOT_SUITABLE); return result; } /** * create command */ Loading Loading @@ -190,15 +197,10 @@ public class ExecutorService extends BaseService{ if (status != Status.SUCCESS) { return checkResult; } // checkTenantExists(); Tenant tenant = processDao.getTenantForProcess(processDefinition.getTenantId(), processDefinition.getUserId()); if(tenant == null){ if (!checkTenantSuitable(processDefinition)){ logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ", processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; putMsg(result, Status.TENANT_NOT_SUITABLE); } switch (executeType) { Loading Loading @@ -240,6 +242,21 @@ public class ExecutorService extends BaseService{ return result; } /** * check tenant suitable * @param processDefinition * @return */ private boolean checkTenantSuitable(ProcessDefinition processDefinition) { // checkTenantExists(); Tenant tenant = processDao.getTenantForProcess(processDefinition.getTenantId(), processDefinition.getUserId()); if(tenant == null){ return false; } return true; } /** * Check the state of process instance and the type of operation match * Loading
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +7 −4 Original line number Diff line number Diff line Loading @@ -228,12 +228,12 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { int j = 0; List<String> taskslist = new ArrayList<>(tasksNum); while(iterator.hasNext()){ if(j++ < tasksNum){ if(j++ >= tasksNum){ break; } String task = iterator.next(); taskslist.add(getOriginTaskFormat(task)); } } return taskslist; } Loading @@ -245,6 +245,9 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { */ private String getOriginTaskFormat(String formatTask){ String[] taskArray = formatTask.split(Constants.UNDERLINE); if(taskArray.length< 4){ return formatTask; } int processInstanceId = Integer.parseInt(taskArray[1]); int taskId = Integer.parseInt(taskArray[3]); Loading
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +5 −0 Original line number Diff line number Diff line Loading @@ -210,6 +210,11 @@ public class FetchTaskThread implements Runnable{ Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), processDefine.getUserId()); if(tenant == null){ logger.error("cannot find suitable tenant for the task:{}, process instance tenant:{}, process definition tenant:{}", taskInstance.getName(),processInstance.getTenantId(), processDefine.getTenantId()); continue; } // check and create Linux users FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, Loading