diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 5d654bcc47..2d5868b251 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -103,4 +103,12 @@ public interface ExecutorService { * @return check result */ boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition); + + /** + * force start Task Instance + * @param loginUser + * @param queueId + * @return + */ + Map forceStartTaskInstance(User loginUser, int queueId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 35d714a16c..fe562ef41d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -103,6 +103,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; + @Autowired + private TaskGroupQueueMapper taskGroupQueueMapper; + /** * execute process instance * @@ -350,6 +353,24 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } + @Override + public Map forceStartTaskInstance(User loginUser, int queueId) { + Map result = new HashMap<>(); + TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.selectById(queueId); + // check process instance exist + ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId()); + if (processInstance == null) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId()); + return result; + } + + // check master exists + if (!checkMasterExists(result)) { + return result; + } + return forceStart(processInstance, taskGroupQueue); + } + /** * check tenant suitable * @@ -445,16 +466,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param processInstance process instance * @return update result */ - private Map forceStartTaskInstance(ProcessInstance processInstance, int taskId) { + private Map forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) { Map result = new HashMap<>(); - TaskGroupQueue taskGroupQueue = processService.loadTaskGroupQueue(taskId); if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) { putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START); return result; } + taskGroupQueue.setForceStart(Flag.YES.getCode()); processService.updateTaskGroupQueue(taskGroupQueue); - processService.sendStartTask2Master(processInstance,taskId + processService.sendStartTask2Master(processInstance, taskGroupQueue.getTaskId() ,org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST); putMsg(result, Status.SUCCESS); return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java index 2b6d148259..6655455427 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.TaskGroupQueueService; import org.apache.dolphinscheduler.api.service.TaskGroupService; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -59,6 +60,9 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe @Autowired private ProcessService processService; + @Autowired + private ExecutorService executorService; + private static final Logger logger = LoggerFactory.getLogger(TaskGroupServiceImpl.class); /** @@ -303,11 +307,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe */ @Override public Map forceStartTask(User loginUser, int queueId) { - Map result = new HashMap<>(); - - taskGroupQueueService.forceStartTask(queueId, Flag.YES.getCode()); - putMsg(result, Status.SUCCESS); - return result; + return executorService.forceStartTaskInstance(loginUser, queueId); } @Override diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 3f2bd02575..9c0c1c3231 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -31,18 +31,22 @@ import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -95,6 +99,12 @@ public class ExecutorServiceTest { @Mock private MonitorService monitorService; + @Mock + private TaskGroupQueueMapper taskGroupQueueMapper; + + @Mock + private ProcessInstanceMapper processInstanceMapper; + private int processDefinitionId = 1; private long processDefinitionCode = 1L; @@ -105,10 +115,14 @@ public class ExecutorServiceTest { private int userId = 1; + private int taskQueueId = 1; + private ProcessDefinition processDefinition = new ProcessDefinition(); private ProcessInstance processInstance = new ProcessInstance(); + private TaskGroupQueue taskGroupQueue = new TaskGroupQueue(); + private User loginUser = new User(); private long projectCode = 1L; @@ -145,6 +159,11 @@ public class ExecutorServiceTest { project.setCode(projectCode); project.setName(projectName); + // taskGroupQueue + taskGroupQueue.setId(taskQueueId); + taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE); + taskGroupQueue.setProcessId(processInstanceId); + // cronRangeTime cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00"; @@ -157,6 +176,15 @@ public class ExecutorServiceTest { Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList()); Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance); Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition); + Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue); + Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance); + } + + @Test + public void testForceStartTaskInstance(){ + + Map result = executorService.forceStartTaskInstance(loginUser, taskQueueId); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } /** diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java index d8110a0704..ddce9a310d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java @@ -174,14 +174,4 @@ public class TaskGroupServiceTest { result = taskGroupService.startTaskGroup(loginUser, 1); Assert.assertEquals(Status.TASK_GROUP_STATUS_OPENED, result.get(Constants.STATUS)); } - - @Test - public void testWakeTaskFroceManually() { - - TreeMap tm = new TreeMap<>(); - tm.put(1, 1); - Map map1 = taskGroupService.forceStartTask(getLoginUser(), 1); - Assert.assertEquals(Status.SUCCESS, map1.get(Constants.STATUS)); - - } }