Commit 1c77708a authored by 老佛爷's avatar 老佛爷 Committed by qiaozhanwei
Browse files

Supplementary data by schedule (#1830)



* Supplementary data by schedule

* fix sonar check bug

* fix code duplicated blocks

* ut

* loop by day

* MasterExecThread test

* test add licene

Co-authored-by: default avatardailidong <dailidong66@gmail.com>
parent 53814425
Loading
Loading
Loading
Loading
+40 −13
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.server.utils.ScheduleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -499,12 +501,32 @@ public class ExecutorService extends BaseService{

        if(commandType == CommandType.COMPLEMENT_DATA){
            runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
            if(null != start && null != end && start.before(end)){
                if(runMode == RunMode.RUN_MODE_SERIAL){
                    cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
                    cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
                    command.setCommandParam(JSONUtils.toJson(cmdParam));
                    return processDao.createCommand(command);
                }else if (runMode == RunMode.RUN_MODE_PARALLEL){
                    List<Schedule> schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
                    List<Date> listDate = new LinkedList<>();
                    if(!CollectionUtils.isEmpty(schedules)){
                        for (Schedule item : schedules) {
                            List<Date> list = ScheduleUtils.getRecentTriggerTime(item.getCrontab(), start, end);
                            listDate.addAll(list);
                        }
                    }
                    if(!CollectionUtils.isEmpty(listDate)){
                        // loop by schedule date
                        for (Date date : listDate) {
                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date));
                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date));
                            command.setCommandParam(JSONUtils.toJson(cmdParam));
                            processDao.createCommand(command);
                        }
                        return listDate.size();
                    }else{
                        // loop by day
                        int runCunt = 0;
                        while(!start.after(end)) {
                            runCunt += 1;
@@ -516,6 +538,11 @@ public class ExecutorService extends BaseService{
                        }
                        return runCunt;
                    }
                }
            }else{
                logger.error("there is not vaild schedule date for the process definition: id:{},date:{}",
                        processDefineId, schedule);
            }
        }else{
            command.setCommandParam(JSONUtils.toJson(cmdParam));
            return processDao.createCommand(command);
+229 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.text.ParseException;
import java.util.*;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;

/**
 * test for ExecutorService
 */
@RunWith(MockitoJUnitRunner.Silent.class)
public class ExecutorService2Test {

    @InjectMocks
    private ExecutorService executorService;

    @Mock
    private ProcessDao processDao;

    @Mock
    private ProcessDefinitionMapper processDefinitionMapper;

    @Mock
    private ProjectMapper projectMapper;

    @Mock
    private ProjectService projectService;

    private int processDefinitionId = 1;

    private int tenantId = 1;

    private int userId = 1;

    private ProcessDefinition processDefinition = new ProcessDefinition();

    private User loginUser = new User();

    private String projectName = "projectName";

    private Project project = new Project();

    private String cronTime;

    @Before
    public void init(){
        // user
        loginUser.setId(userId);

        // processDefinition
        processDefinition.setId(processDefinitionId);
        processDefinition.setReleaseState(ReleaseState.ONLINE);
        processDefinition.setTenantId(tenantId);
        processDefinition.setUserId(userId);

        // project
        project.setName(projectName);

        // cronRangeTime
        cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00";

        // mock
        Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkProjectAndAuth());
        Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition);
        Mockito.when(processDao.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
        Mockito.when(processDao.createCommand(any(Command.class))).thenReturn(1);
    }

    /**
     * not complement
     * @throws ParseException
     */
    @Test
    public void testNoComplement() throws ParseException {
        try {
            Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
            Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
                    processDefinitionId, cronTime, CommandType.START_PROCESS,
                    null, null,
                    null, null, 0,
                    "", "", RunMode.RUN_MODE_SERIAL,
                    Priority.LOW, 0, 110);
            Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
            verify(processDao, times(1)).createCommand(any(Command.class));
        }catch (Exception e){
            Assert.assertTrue(false);
        }
    }

    /**
     * date error
     * @throws ParseException
     */
    @Test
    public void testDateError() throws ParseException {
        try {
            Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
            Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
                    processDefinitionId, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
                    null, null,
                    null, null, 0,
                    "", "", RunMode.RUN_MODE_SERIAL,
                    Priority.LOW, 0, 110);
            Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
            verify(processDao, times(0)).createCommand(any(Command.class));
        }catch (Exception e){
            Assert.assertTrue(false);
        }
    }

    /**
     * serial
     * @throws ParseException
     */
    @Test
    public void testSerial() throws ParseException {
        try {
            Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
            Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
                    processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
                    null, null,
                    null, null, 0,
                    "", "", RunMode.RUN_MODE_SERIAL,
                    Priority.LOW, 0, 110);
            Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
            verify(processDao, times(1)).createCommand(any(Command.class));
        }catch (Exception e){
            Assert.assertTrue(false);
        }
    }

    /**
     * without schedule
     * @throws ParseException
     */
    @Test
    public void testParallelWithOutSchedule() throws ParseException {
        try{
            Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
            Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
                    processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
                    null, null,
                    null, null, 0,
                    "", "", RunMode.RUN_MODE_PARALLEL,
                    Priority.LOW, 0, 110);
            Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
            verify(processDao, times(31)).createCommand(any(Command.class));
        }catch (Exception e){
            Assert.assertTrue(false);
        }
    }

    /**
     * with schedule
     * @throws ParseException
     */
    @Test
    public void testParallelWithSchedule() throws ParseException {
        try{
            Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
            Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
                    processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
                    null, null,
                    null, null, 0,
                    "", "", RunMode.RUN_MODE_PARALLEL,
                    Priority.LOW, 0, 110);
            Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
            verify(processDao, times(16)).createCommand(any(Command.class));
        }catch (Exception e){
            Assert.assertTrue(false);
        }
    }

    private List<Schedule> zeroSchedulerList(){
        return Collections.EMPTY_LIST;
    }

    private List<Schedule> oneSchedulerList(){
        List<Schedule> schedulerList = new LinkedList<>();
        Schedule schedule = new Schedule();
        schedule.setCrontab("0 0 0 1/2 * ?");
        schedulerList.add(schedule);
        return schedulerList;
    }

    private Map<String, Object> checkProjectAndAuth(){
        Map<String, Object> result = new HashMap<>();
        result.put(Constants.STATUS, Status.SUCCESS);
        return result;
    }
}
 No newline at end of file
+9 −0
Original line number Diff line number Diff line
@@ -1461,6 +1461,15 @@ public class ProcessDao {
        return scheduleMapper.selectById(id);
    }

    /**
     * query Schedule by processDefinitionId
     * @param processDefinitionId processDefinitionId
     * @see Schedule
     */
    public List<Schedule> queryReleaseSchedulerListByProcessDefinitionId(int processDefinitionId) {
        return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
    }

    /**
     * query need failover process instance
     * @param host host
+7 −0
Original line number Diff line number Diff line
@@ -60,4 +60,11 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
     */
    List<Schedule> queryByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);

    /**
     * query schedule list by process definition id
     * @param processDefinitionId
     * @return
     */
    List<Schedule> queryReleaseSchedulerListByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);

}
+5 −0
Original line number Diff line number Diff line
@@ -55,4 +55,9 @@
        from t_ds_schedules
        where process_definition_id =#{processDefinitionId}
    </select>
    <select id="queryReleaseSchedulerListByProcessDefinitionId" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
        select *
        from t_ds_schedules
        where process_definition_id =#{processDefinitionId} and release_state = 1
    </select>
</mapper>
 No newline at end of file
Loading