diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 24e9242d11..0a5c8a10dd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -60,7 +60,7 @@ import springfox.documentation.annotations.ApiIgnore; */ @Api(tags = "EXECUTOR_TAG") @RestController -@RequestMapping("projects/{projectName}/executors") +@RequestMapping("projects/{projectCode}/executors") public class ExecutorController extends BaseController { @Autowired @@ -70,8 +70,8 @@ public class ExecutorController extends BaseController { * execute process instance * * @param loginUser login user - * @param projectName project name - * @param processDefinitionId process definition id + * @param projectCode project code + * @param processDefinitionCode process definition code * @param scheduleTime schedule time * @param failureStrategy failure strategy * @param startNodeList start nodes list @@ -87,26 +87,26 @@ public class ExecutorController extends BaseController { */ @ApiOperation(value = "startProcessInstance", notes = "RUN_PROCESS_INSTANCE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"), - @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"), - @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"), - @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"), - @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"), - @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"), - @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"), - @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"), - @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"), - @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"), + @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "100"), + @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"), + @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"), + @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"), + @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"), + @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"), + @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"), + @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"), + @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"), + @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"), + @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"), }) @PostMapping(value = "start-process-instance") @ResponseStatus(HttpStatus.OK) @ApiException(START_PROCESS_INSTANCE_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, - @RequestParam(value = "processDefinitionId") int processDefinitionId, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam(value = "processDefinitionCode") int processDefinitionCode, @RequestParam(value = "scheduleTime", required = false) String scheduleTime, @RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy, @RequestParam(value = "startNodeList", required = false) String startNodeList, @@ -127,58 +127,55 @@ public class ExecutorController extends BaseController { if (startParams != null) { startParamMap = JSONUtils.toMap(startParams); } - Map result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy, + Map result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy, startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, timeout, startParamMap); return returnDataList(result); } - /** * do action to process instance:pause, stop, repeat, recover from pause, recover from stop * * @param loginUser login user - * @param projectName project name + * @param projectCode project code * @param processInstanceId process instance id * @param executeType execute type * @return execute result code */ @ApiOperation(value = "execute", notes = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType") + @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType") }) @PostMapping(value = "/execute") @ResponseStatus(HttpStatus.OK) @ApiException(EXECUTE_PROCESS_INSTANCE_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @RequestParam("processInstanceId") Integer processInstanceId, @RequestParam("executeType") ExecuteType executeType ) { - Map result = execService.execute(loginUser, projectName, processInstanceId, executeType); + Map result = execService.execute(loginUser, projectCode, processInstanceId, executeType); return returnDataList(result); } /** * check process definition and all of the son process definitions is on line. * - * @param loginUser login user - * @param processDefinitionId process definition id + * @param processDefinitionCode process definition code * @return check result code */ @ApiOperation(value = "startCheckProcessDefinition", notes = "START_CHECK_PROCESS_DEFINITION_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") + @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "100") }) @PostMapping(value = "/start-check") @ResponseStatus(HttpStatus.OK) @ApiException(CHECK_PROCESS_DEFINITION_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") - public Result startCheckProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "processDefinitionId") int processDefinitionId) { - Map result = execService.startCheckByProcessDefinedId(processDefinitionId); + public Result startCheckProcessDefinition(@RequestParam(value = "processDefinitionCode") int processDefinitionCode) { + Map result = execService.startCheckByProcessDefinedCode(processDefinitionCode); return returnDataList(result); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 72a0089bc1..ac850fff89 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -38,8 +38,8 @@ public interface ExecutorService { * execute process instance * * @param loginUser login user - * @param projectName project name - * @param processDefinitionId process Definition Id + * @param projectCode project code + * @param processDefinitionCode process definition code * @param cronTime cron time * @param commandType command type * @param failureStrategy failuer strategy @@ -54,8 +54,8 @@ public interface ExecutorService { * @param startParams the global param values which pass to new process instance * @return execute process instance code */ - Map execProcessInstance(User loginUser, String projectName, - int processDefinitionId, String cronTime, CommandType commandType, + Map execProcessInstance(User loginUser, long projectCode, + long processDefinitionCode, String cronTime, CommandType commandType, FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, int warningGroupId, RunMode runMode, @@ -75,18 +75,18 @@ public interface ExecutorService { * do action to process instance:pause, stop, repeat, recover from pause, recover from stop * * @param loginUser login user - * @param projectName project name + * @param projectCode project code * @param processInstanceId process instance id * @param executeType execute type * @return execute result code */ - Map execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType); + Map execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType); /** * check if sub processes are offline before starting process definition * - * @param processDefineId process definition id + * @param processDefinitionCode process definition code * @return check result code */ - Map startCheckByProcessDefinedId(int processDefineId); + Map startCheckByProcessDefinedCode(long processDefinitionCode); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 16213be7b7..1d944d0737 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -102,8 +102,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * execute process instance * * @param loginUser login user - * @param projectName project name - * @param processDefinitionId process Definition Id + * @param projectCode project code + * @param processDefinitionCode process definition code * @param cronTime cron time * @param commandType command type * @param failureStrategy failuer strategy @@ -119,8 +119,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @return execute process instance code */ @Override - public Map execProcessInstance(User loginUser, String projectName, - int processDefinitionId, String cronTime, CommandType commandType, + public Map execProcessInstance(User loginUser, long projectCode, + long processDefinitionCode, String cronTime, CommandType commandType, FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, int warningGroupId, RunMode runMode, @@ -132,15 +132,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR); return result; } - Project project = projectMapper.queryByName(projectName); - Map checkResultAndAuth = checkResultAndAuth(loginUser, projectName, project); + Project project = projectMapper.queryByCode(projectCode); + Map checkResultAndAuth = checkResultAndAuth(loginUser, project.getName(), project); if (checkResultAndAuth != null) { return checkResultAndAuth; } // check process define release state - ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId); - result = checkProcessDefinitionValid(processDefinition, processDefinitionId); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + result = checkProcessDefinitionValid(processDefinition, processDefinitionCode); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -160,7 +160,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * create command */ - int create = this.createCommand(commandType, processDefinitionId, + int create = this.createCommand(commandType, processDefinition.getId(), taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup, startParams); @@ -218,17 +218,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * do action to process instance:pause, stop, repeat, recover from pause, recover from stop * * @param loginUser login user - * @param projectName project name + * @param projectCode project code * @param processInstanceId process instance id * @param executeType execute type * @return execute result code */ @Override - public Map execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType) { + public Map execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) { Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); + Project project = projectMapper.queryByCode(projectCode); - Map checkResult = checkResultAndAuth(loginUser, projectName, project); + Map checkResult = checkResultAndAuth(loginUser, project.getName(), project); if (checkResult != null) { return checkResult; } @@ -433,31 +433,35 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * check if sub processes are offline before starting process definition * - * @param processDefineId process definition id + * @param processDefinitionCode process definition code * @return check result code */ @Override - public Map startCheckByProcessDefinedId(int processDefineId) { + public Map startCheckByProcessDefinedCode(long processDefinitionCode) { Map result = new HashMap<>(); - if (processDefineId == 0) { - logger.error("process definition id is null"); - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "process definition id"); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + + if (processDefinition == null) { + logger.error("process definition is not found"); + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "processDefinitionCode"); + return result; } + List ids = new ArrayList<>(); - processService.recurseFindSubProcessId(processDefineId, ids); + processService.recurseFindSubProcessId(processDefinition.getId(), ids); Integer[] idArray = ids.toArray(new Integer[ids.size()]); if (!ids.isEmpty()) { List processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(idArray); if (processDefinitionList != null) { - for (ProcessDefinition processDefinition : processDefinitionList) { + for (ProcessDefinition processDefinitionTmp : processDefinitionList) { /** * if there is no online process, exit directly */ - if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { - putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); + if (processDefinitionTmp.getReleaseState() != ReleaseState.ONLINE) { + putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName()); logger.info("not release process definition id: {} , name : {}", - processDefinition.getId(), processDefinition.getName()); + processDefinitionTmp.getId(), processDefinitionTmp.getName()); return result; } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java index b3e093a067..1bf10ae942 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java @@ -23,16 +23,22 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import java.util.HashMap; +import java.util.Map; + import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.http.MediaType; import org.springframework.test.web.servlet.MvcResult; import org.springframework.util.LinkedMultiValueMap; @@ -45,26 +51,34 @@ public class ExecutorControllerTest extends AbstractControllerTest { private static Logger logger = LoggerFactory.getLogger(ExecutorControllerTest.class); - @Ignore + @MockBean + private ExecutorService executorService; + @Test public void testStartProcessInstance() throws Exception { + Map resultData = new HashMap<>(); + resultData.put(Constants.STATUS, Status.SUCCESS); + Mockito.when(executorService.execProcessInstance(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any())).thenReturn(resultData); + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); - paramsMap.add("processDefinitionId", "40"); + paramsMap.add("processDefinitionCode", "1"); paramsMap.add("scheduleTime", ""); paramsMap.add("failureStrategy", String.valueOf(FailureStrategy.CONTINUE)); paramsMap.add("startNodeList", ""); paramsMap.add("taskDependType", ""); paramsMap.add("execType", ""); paramsMap.add("warningType", String.valueOf(WarningType.NONE)); - paramsMap.add("warningGroupId", ""); + paramsMap.add("warningGroupId", "1"); paramsMap.add("receivers", ""); paramsMap.add("receiversCc", ""); paramsMap.add("runMode", ""); paramsMap.add("processInstancePriority", ""); - paramsMap.add("workerGroupId", ""); + paramsMap.add("workerGroupId", "1"); paramsMap.add("timeout", ""); - MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/executors/start-process-instance", "cxc_1113") + MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", 1L) .header("sessionId", sessionId) .params(paramsMap)) .andExpect(status().isOk()) @@ -75,14 +89,17 @@ public class ExecutorControllerTest extends AbstractControllerTest { logger.info(mvcResult.getResponse().getContentAsString()); } - @Ignore @Test public void testExecute() throws Exception { + Map resultData = new HashMap<>(); + resultData.put(Constants.STATUS, Status.SUCCESS); + Mockito.when(executorService.execute(Mockito.any(), Mockito.anyLong(), Mockito.anyInt(), Mockito.any())).thenReturn(resultData); + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); paramsMap.add("processInstanceId", "40"); paramsMap.add("executeType", String.valueOf(ExecuteType.NONE)); - MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/executors/execute", "cxc_1113") + MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/execute", 1L) .header("sessionId", sessionId) .params(paramsMap)) .andExpect(status().isOk()) @@ -94,11 +111,14 @@ public class ExecutorControllerTest extends AbstractControllerTest { } @Test - public void testStartCheckProcessDefinition() throws Exception { + public void testStartCheck() throws Exception { + Map resultData = new HashMap<>(); + resultData.put(Constants.STATUS, Status.SUCCESS); + Mockito.when(executorService.startCheckByProcessDefinedCode(Mockito.anyLong())).thenReturn(resultData); - MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/executors/start-check", "cxc_1113") + MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-check", 1L) .header(SESSION_ID, sessionId) - .param("processDefinitionId", "40")) + .param("processDefinitionCode", "1")) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) .andReturn(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java deleted file mode 100644 index e389d0b621..0000000000 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.service; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import org.apache.dolphinscheduler.api.enums.ExecuteType; -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl; -import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.enums.Priority; -import org.apache.dolphinscheduler.common.enums.ReleaseState; -import org.apache.dolphinscheduler.common.enums.RunMode; -import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.Tenant; -import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.service.process.ProcessService; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; - -/** - * executor service 2 test - */ -@RunWith(MockitoJUnitRunner.Silent.class) -public class ExecutorService2Test { - - @InjectMocks - private ExecutorServiceImpl executorService; - - @Mock - private ProcessService processService; - - @Mock - private ProcessDefinitionMapper processDefinitionMapper; - - @Mock - private ProjectMapper projectMapper; - - @Mock - private ProjectServiceImpl projectService; - - @Mock - private MonitorService monitorService; - - private int processDefinitionId = 1; - - private int processInstanceId = 1; - - private int tenantId = 1; - - private int userId = 1; - - private ProcessDefinition processDefinition = new ProcessDefinition(); - - private ProcessInstance processInstance = new ProcessInstance(); - - private User loginUser = new User(); - - private String projectName = "projectName"; - - private Project project = new Project(); - - private String cronTime; - - @Before - public void init() { - // user - loginUser.setId(userId); - - // processDefinition - processDefinition.setId(processDefinitionId); - processDefinition.setReleaseState(ReleaseState.ONLINE); - processDefinition.setTenantId(tenantId); - processDefinition.setUserId(userId); - processDefinition.setVersion(1); - processDefinition.setCode(1L); - - // processInstance - processInstance.setId(processInstanceId); - processInstance.setState(ExecutionStatus.FAILURE); - processInstance.setExecutorId(userId); - processInstance.setTenantId(tenantId); - processInstance.setProcessDefinitionVersion(1); - processInstance.setProcessDefinitionCode(1L); - - // project - project.setName(projectName); - - // cronRangeTime - cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00"; - - // mock - Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkProjectAndAuth()); - Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition); - Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); - Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1); - Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList()); - Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance); - Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition); - } - - /** - * not complement - */ - @Test - public void testNoComplement() { - - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); - Map result = executorService.execProcessInstance(loginUser, projectName, - processDefinitionId, cronTime, CommandType.START_PROCESS, - null, null, - null, null, 0, - RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); - Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processService, times(1)).createCommand(any(Command.class)); - - } - - /** - * not complement - */ - @Test - public void testComplementWithStartNodeList() { - - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); - Map result = executorService.execProcessInstance(loginUser, projectName, - processDefinitionId, cronTime, CommandType.START_PROCESS, - null, "n1,n2", - null, null, 0, - RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); - Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processService, times(1)).createCommand(any(Command.class)); - - } - - - /** - * date error - */ - @Test - public void testDateError() { - - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); - Map result = executorService.execProcessInstance(loginUser, projectName, - processDefinitionId, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA, - null, null, - null, null, 0, - RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); - Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); - verify(processService, times(0)).createCommand(any(Command.class)); - } - - /** - * serial - */ - @Test - public void testSerial() { - - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); - Map result = executorService.execProcessInstance(loginUser, projectName, - processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, - null, null, - null, null, 0, - RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); - Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processService, times(1)).createCommand(any(Command.class)); - - } - - /** - * without schedule - */ - @Test - public void testParallelWithOutSchedule() { - - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); - Map result = executorService.execProcessInstance(loginUser, projectName, - processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, - null, null, - null, null, 0, - RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); - Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processService, times(31)).createCommand(any(Command.class)); - - } - - /** - * with schedule - */ - @Test - public void testParallelWithSchedule() { - - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); - Map result = executorService.execProcessInstance(loginUser, projectName, - processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, - null, null, - null, null, 0, - RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); - Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processService, times(15)).createCommand(any(Command.class)); - - } - - @Test - public void testNoMsterServers() { - Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(new ArrayList<>()); - - Map result = executorService.execProcessInstance(loginUser, projectName, - processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, - null, null, - null, null, 0, - RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); - Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS); - - } - - @Test - public void testExecuteRepeatRunning() { - Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); - - Map result = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.REPEAT_RUNNING); - Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - } - - private List getMasterServersList() { - List masterServerList = new ArrayList<>(); - Server masterServer1 = new Server(); - masterServer1.setId(1); - masterServer1.setHost("192.168.220.188"); - masterServer1.setPort(1121); - masterServerList.add(masterServer1); - - Server masterServer2 = new Server(); - masterServer2.setId(2); - masterServer2.setHost("192.168.220.189"); - masterServer2.setPort(1122); - masterServerList.add(masterServer2); - - return masterServerList; - - } - - private List zeroSchedulerList() { - return Collections.EMPTY_LIST; - } - - private List oneSchedulerList() { - List schedulerList = new LinkedList<>(); - Schedule schedule = new Schedule(); - schedule.setCrontab("0 0 0 1/2 * ?"); - schedulerList.add(schedule); - return schedulerList; - } - - private Map checkProjectAndAuth() { - Map result = new HashMap<>(); - result.put(Constants.STATUS, Status.SUCCESS); - return result; - } -} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 071b77c756..624fa6e124 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -17,52 +17,313 @@ package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.api.controller.AbstractControllerTest; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl; +import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; -import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import org.junit.Assert; -import org.junit.Ignore; +import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; /** - * executor service test + * executor service 2 test */ -public class ExecutorServiceTest extends AbstractControllerTest { - - private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTest.class); +@RunWith(MockitoJUnitRunner.Silent.class) +public class ExecutorServiceTest { - @Autowired + @InjectMocks private ExecutorServiceImpl executorService; - @Ignore + @Mock + private ProcessService processService; + + @Mock + private ProcessDefinitionMapper processDefinitionMapper; + + @Mock + private ProjectMapper projectMapper; + + @Mock + private ProjectServiceImpl projectService; + + @Mock + private MonitorService monitorService; + + private int processDefinitionId = 1; + + private long processDefinitionCode = 1L; + + private int processInstanceId = 1; + + private int tenantId = 1; + + private int userId = 1; + + private ProcessDefinition processDefinition = new ProcessDefinition(); + + private ProcessInstance processInstance = new ProcessInstance(); + + private User loginUser = new User(); + + private long projectCode = 1L; + + private String projectName = "projectName"; + + private Project project = new Project(); + + private String cronTime; + + @Before + public void init() { + // user + loginUser.setId(userId); + + // processDefinition + processDefinition.setId(processDefinitionId); + processDefinition.setReleaseState(ReleaseState.ONLINE); + processDefinition.setTenantId(tenantId); + processDefinition.setUserId(userId); + processDefinition.setVersion(1); + processDefinition.setCode(1L); + + // processInstance + processInstance.setId(processInstanceId); + processInstance.setState(ExecutionStatus.FAILURE); + processInstance.setExecutorId(userId); + processInstance.setTenantId(tenantId); + processInstance.setProcessDefinitionVersion(1); + processInstance.setProcessDefinitionCode(1L); + + // project + project.setCode(projectCode); + project.setName(projectName); + + // cronRangeTime + cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00"; + + // mock + Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkProjectAndAuth()); + Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition); + Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); + Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1); + Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList()); + Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance); + Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition); + } + + /** + * not complement + */ + @Test + public void testNoComplement() { + + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectCode, + processDefinitionCode, cronTime, CommandType.START_PROCESS, + null, null, + null, null, 0, + RunMode.RUN_MODE_SERIAL, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processService, times(1)).createCommand(any(Command.class)); + + } + + /** + * not complement + */ + @Test + public void testComplementWithStartNodeList() { + + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectCode, + processDefinitionCode, cronTime, CommandType.START_PROCESS, + null, "n1,n2", + null, null, 0, + RunMode.RUN_MODE_SERIAL, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processService, times(1)).createCommand(any(Command.class)); + + } + + /** + * date error + */ + @Test + public void testDateError() { + + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectCode, + processDefinitionCode, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + RunMode.RUN_MODE_SERIAL, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); + verify(processService, times(0)).createCommand(any(Command.class)); + } + + /** + * serial + */ + @Test + public void testSerial() { + + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectCode, + processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + RunMode.RUN_MODE_SERIAL, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processService, times(1)).createCommand(any(Command.class)); + } + + /** + * without schedule + */ @Test - public void startCheckByProcessDefinedId() { - Map map = executorService.startCheckByProcessDefinedId(1234); - Assert.assertNull(map); + public void testParallelWithOutSchedule() { + + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectCode, + processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + RunMode.RUN_MODE_PARALLEL, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processService, times(31)).createCommand(any(Command.class)); + } + /** + * with schedule + */ @Test - public void putMsgWithParamsTest() { - Map map = new HashMap<>(); - putMsgWithParams(map, Status.PROJECT_ALREADY_EXISTS); - logger.info(map.toString()); + public void testParallelWithSchedule() { + + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectCode, + processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + RunMode.RUN_MODE_PARALLEL, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processService, times(15)).createCommand(any(Command.class)); + + } + + @Test + public void testNoMasterServers() { + Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(new ArrayList<>()); + + Map result = executorService.execProcessInstance(loginUser, projectCode, + processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + RunMode.RUN_MODE_PARALLEL, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS); + + } + + @Test + public void testExecuteRepeatRunning() { + Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); + + Map result = executorService.execute(loginUser, projectCode, processInstanceId, ExecuteType.REPEAT_RUNNING); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + } + + @Test + public void testStartCheckByProcessDefinedCode() { + List ids = new ArrayList<>(); + ids.add(1); + Mockito.doNothing().when(processService).recurseFindSubProcessId(1, ids); + + List processDefinitionList = new ArrayList<>(); + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setId(1); + processDefinition.setReleaseState(ReleaseState.ONLINE); + processDefinitionList.add(processDefinition); + Mockito.when(processDefinitionMapper.queryDefinitionListByIdList(new Integer[ids.size()])) + .thenReturn(processDefinitionList); + + Map result = executorService.startCheckByProcessDefinedCode(1L); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + } + + private List getMasterServersList() { + List masterServerList = new ArrayList<>(); + Server masterServer1 = new Server(); + masterServer1.setId(1); + masterServer1.setHost("192.168.220.188"); + masterServer1.setPort(1121); + masterServerList.add(masterServer1); + + Server masterServer2 = new Server(); + masterServer2.setId(2); + masterServer2.setHost("192.168.220.189"); + masterServer2.setPort(1122); + masterServerList.add(masterServer2); + + return masterServerList; + } + + private List zeroSchedulerList() { + return Collections.EMPTY_LIST; + } + + private List oneSchedulerList() { + List schedulerList = new LinkedList<>(); + Schedule schedule = new Schedule(); + schedule.setCrontab("0 0 0 1/2 * ?"); + schedulerList.add(schedule); + return schedulerList; } - void putMsgWithParams(Map result, Status status, Object... statusParams) { - result.put(Constants.STATUS, status); - if (statusParams != null && statusParams.length > 0) { - result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams)); - } else { - result.put(Constants.MSG, status.getMsg()); - } + private Map checkProjectAndAuth() { + Map result = new HashMap<>(); + result.put(Constants.STATUS, Status.SUCCESS); + return result; } -} \ No newline at end of file +} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue index d730f508b5..70d86ab1b3 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue @@ -210,7 +210,7 @@ */ _start (item) { this.getWorkerGroupsAll() - this.getStartCheck({ processDefinitionId: item.id }).then(res => { + this.getStartCheck({ processDefinitionCode: item.code }).then(res => { this.startData = item this.startDialog = true }).catch(e => { diff --git a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js index f7b8c0e73a..63bce501d2 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js @@ -124,7 +124,7 @@ export default { */ editExecutorsState ({ state }, payload) { return new Promise((resolve, reject) => { - io.post(`projects/${state.projectName}/executors/execute`, { + io.post(`projects/${state.projectCode}/executors/execute`, { processInstanceId: payload.processInstanceId, executeType: payload.executeType }, res => { @@ -506,7 +506,7 @@ export default { */ processStart ({ state }, payload) { return new Promise((resolve, reject) => { - io.post(`projects/${state.projectName}/executors/start-process-instance`, payload, res => { + io.post(`projects/${state.projectCode}/executors/start-process-instance`, payload, res => { resolve(res) }).catch(e => { reject(e) @@ -543,7 +543,7 @@ export default { */ getStartCheck ({ state }, payload) { return new Promise((resolve, reject) => { - io.post(`projects/${state.projectName}/executors/start-check`, payload, res => { + io.post(`projects/${state.projectCode}/executors/start-check`, payload, res => { resolve(res) }).catch(e => { reject(e)