Browse Source

[Fix-5519] The executor api, projectName change to projectCode (#5863)

* fix: the executor api, projectName change to projectCode

* fix checkstyle

* fix: when process definition is null, return error message

Co-authored-by: wen-hemin <wenhemin@apache.com>
2.0.7-release
wen-hemin 3 years ago committed by GitHub
parent
commit
c5bc4fc48e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 57
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  2. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  3. 50
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  4. 42
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
  5. 309
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
  6. 315
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  7. 2
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue
  8. 6
      dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js

57
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@ -60,7 +60,7 @@ import springfox.documentation.annotations.ApiIgnore;
*/
@Api(tags = "EXECUTOR_TAG")
@RestController
@RequestMapping("projects/{projectName}/executors")
@RequestMapping("projects/{projectCode}/executors")
public class ExecutorController extends BaseController {
@Autowired
@ -70,8 +70,8 @@ public class ExecutorController extends BaseController {
* execute process instance
*
* @param loginUser login user
* @param projectName project name
* @param processDefinitionId process definition id
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param scheduleTime schedule time
* @param failureStrategy failure strategy
* @param startNodeList start nodes list
@ -87,26 +87,26 @@ public class ExecutorController extends BaseController {
*/
@ApiOperation(value = "startProcessInstance", notes = "RUN_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"),
@ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
@ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"),
@ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"),
@ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"),
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"),
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "100"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"),
@ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
@ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"),
@ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"),
@ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"),
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"),
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
})
@PostMapping(value = "start-process-instance")
@ResponseStatus(HttpStatus.OK)
@ApiException(START_PROCESS_INSTANCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "processDefinitionId") int processDefinitionId,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "processDefinitionCode") int processDefinitionCode,
@RequestParam(value = "scheduleTime", required = false) String scheduleTime,
@RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@ -127,58 +127,55 @@ public class ExecutorController extends BaseController {
if (startParams != null) {
startParamMap = JSONUtils.toMap(startParams);
}
Map<String, Object> result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy,
Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType,
warningGroupId, runMode, processInstancePriority, workerGroup, timeout, startParamMap);
return returnDataList(result);
}
/**
* do action to process instancepause, stop, repeat, recover from pause, recover from stop
*
* @param loginUser login user
* @param projectName project name
* @param projectCode project code
* @param processInstanceId process instance id
* @param executeType execute type
* @return execute result code
*/
@ApiOperation(value = "execute", notes = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType")
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType")
})
@PostMapping(value = "/execute")
@ResponseStatus(HttpStatus.OK)
@ApiException(EXECUTE_PROCESS_INSTANCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processInstanceId") Integer processInstanceId,
@RequestParam("executeType") ExecuteType executeType
) {
Map<String, Object> result = execService.execute(loginUser, projectName, processInstanceId, executeType);
Map<String, Object> result = execService.execute(loginUser, projectCode, processInstanceId, executeType);
return returnDataList(result);
}
/**
* check process definition and all of the son process definitions is on line.
*
* @param loginUser login user
* @param processDefinitionId process definition id
* @param processDefinitionCode process definition code
* @return check result code
*/
@ApiOperation(value = "startCheckProcessDefinition", notes = "START_CHECK_PROCESS_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100")
@ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "100")
})
@PostMapping(value = "/start-check")
@ResponseStatus(HttpStatus.OK)
@ApiException(CHECK_PROCESS_DEFINITION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result startCheckProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionId") int processDefinitionId) {
Map<String, Object> result = execService.startCheckByProcessDefinedId(processDefinitionId);
public Result startCheckProcessDefinition(@RequestParam(value = "processDefinitionCode") int processDefinitionCode) {
Map<String, Object> result = execService.startCheckByProcessDefinedCode(processDefinitionCode);
return returnDataList(result);
}
}

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -38,8 +38,8 @@ public interface ExecutorService {
* execute process instance
*
* @param loginUser login user
* @param projectName project name
* @param processDefinitionId process Definition Id
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param cronTime cron time
* @param commandType command type
* @param failureStrategy failuer strategy
@ -54,8 +54,8 @@ public interface ExecutorService {
* @param startParams the global param values which pass to new process instance
* @return execute process instance code
*/
Map<String, Object> execProcessInstance(User loginUser, String projectName,
int processDefinitionId, String cronTime, CommandType commandType,
Map<String, Object> execProcessInstance(User loginUser, long projectCode,
long processDefinitionCode, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId,
RunMode runMode,
@ -75,18 +75,18 @@ public interface ExecutorService {
* do action to process instancepause, stop, repeat, recover from pause, recover from stop
*
* @param loginUser login user
* @param projectName project name
* @param projectCode project code
* @param processInstanceId process instance id
* @param executeType execute type
* @return execute result code
*/
Map<String, Object> execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType);
Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType);
/**
* check if sub processes are offline before starting process definition
*
* @param processDefineId process definition id
* @param processDefinitionCode process definition code
* @return check result code
*/
Map<String, Object> startCheckByProcessDefinedId(int processDefineId);
Map<String, Object> startCheckByProcessDefinedCode(long processDefinitionCode);
}

50
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -102,8 +102,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* execute process instance
*
* @param loginUser login user
* @param projectName project name
* @param processDefinitionId process Definition Id
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param cronTime cron time
* @param commandType command type
* @param failureStrategy failuer strategy
@ -119,8 +119,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @return execute process instance code
*/
@Override
public Map<String, Object> execProcessInstance(User loginUser, String projectName,
int processDefinitionId, String cronTime, CommandType commandType,
public Map<String, Object> execProcessInstance(User loginUser, long projectCode,
long processDefinitionCode, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId,
RunMode runMode,
@ -132,15 +132,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
return result;
}
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResultAndAuth = checkResultAndAuth(loginUser, projectName, project);
Project project = projectMapper.queryByCode(projectCode);
Map<String, Object> checkResultAndAuth = checkResultAndAuth(loginUser, project.getName(), project);
if (checkResultAndAuth != null) {
return checkResultAndAuth;
}
// check process define release state
ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId);
result = checkProcessDefinitionValid(processDefinition, processDefinitionId);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
result = checkProcessDefinitionValid(processDefinition, processDefinitionCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@ -160,7 +160,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/**
* create command
*/
int create = this.createCommand(commandType, processDefinitionId,
int create = this.createCommand(commandType, processDefinition.getId(),
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
warningGroupId, runMode, processInstancePriority, workerGroup, startParams);
@ -218,17 +218,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* do action to process instancepause, stop, repeat, recover from pause, recover from stop
*
* @param loginUser login user
* @param projectName project name
* @param projectCode project code
* @param processInstanceId process instance id
* @param executeType execute type
* @return execute result code
*/
@Override
public Map<String, Object> execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType) {
public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Project project = projectMapper.queryByCode(projectCode);
Map<String, Object> checkResult = checkResultAndAuth(loginUser, projectName, project);
Map<String, Object> checkResult = checkResultAndAuth(loginUser, project.getName(), project);
if (checkResult != null) {
return checkResult;
}
@ -433,31 +433,35 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/**
* check if sub processes are offline before starting process definition
*
* @param processDefineId process definition id
* @param processDefinitionCode process definition code
* @return check result code
*/
@Override
public Map<String, Object> startCheckByProcessDefinedId(int processDefineId) {
public Map<String, Object> startCheckByProcessDefinedCode(long processDefinitionCode) {
Map<String, Object> result = new HashMap<>();
if (processDefineId == 0) {
logger.error("process definition id is null");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "process definition id");
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
logger.error("process definition is not found");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "processDefinitionCode");
return result;
}
List<Integer> ids = new ArrayList<>();
processService.recurseFindSubProcessId(processDefineId, ids);
processService.recurseFindSubProcessId(processDefinition.getId(), ids);
Integer[] idArray = ids.toArray(new Integer[ids.size()]);
if (!ids.isEmpty()) {
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(idArray);
if (processDefinitionList != null) {
for (ProcessDefinition processDefinition : processDefinitionList) {
for (ProcessDefinition processDefinitionTmp : processDefinitionList) {
/**
* if there is no online process, exit directly
*/
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
if (processDefinitionTmp.getReleaseState() != ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName());
logger.info("not release process definition id: {} , name : {}",
processDefinition.getId(), processDefinition.getName());
processDefinitionTmp.getId(), processDefinitionTmp.getName());
return result;
}
}

42
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java

@ -23,16 +23,22 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
@ -45,26 +51,34 @@ public class ExecutorControllerTest extends AbstractControllerTest {
private static Logger logger = LoggerFactory.getLogger(ExecutorControllerTest.class);
@Ignore
@MockBean
private ExecutorService executorService;
@Test
public void testStartProcessInstance() throws Exception {
Map<String, Object> resultData = new HashMap<>();
resultData.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(executorService.execProcessInstance(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any())).thenReturn(resultData);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("processDefinitionId", "40");
paramsMap.add("processDefinitionCode", "1");
paramsMap.add("scheduleTime", "");
paramsMap.add("failureStrategy", String.valueOf(FailureStrategy.CONTINUE));
paramsMap.add("startNodeList", "");
paramsMap.add("taskDependType", "");
paramsMap.add("execType", "");
paramsMap.add("warningType", String.valueOf(WarningType.NONE));
paramsMap.add("warningGroupId", "");
paramsMap.add("warningGroupId", "1");
paramsMap.add("receivers", "");
paramsMap.add("receiversCc", "");
paramsMap.add("runMode", "");
paramsMap.add("processInstancePriority", "");
paramsMap.add("workerGroupId", "");
paramsMap.add("workerGroupId", "1");
paramsMap.add("timeout", "");
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/executors/start-process-instance", "cxc_1113")
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", 1L)
.header("sessionId", sessionId)
.params(paramsMap))
.andExpect(status().isOk())
@ -75,14 +89,17 @@ public class ExecutorControllerTest extends AbstractControllerTest {
logger.info(mvcResult.getResponse().getContentAsString());
}
@Ignore
@Test
public void testExecute() throws Exception {
Map<String, Object> resultData = new HashMap<>();
resultData.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(executorService.execute(Mockito.any(), Mockito.anyLong(), Mockito.anyInt(), Mockito.any())).thenReturn(resultData);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("processInstanceId", "40");
paramsMap.add("executeType", String.valueOf(ExecuteType.NONE));
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/executors/execute", "cxc_1113")
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/execute", 1L)
.header("sessionId", sessionId)
.params(paramsMap))
.andExpect(status().isOk())
@ -94,11 +111,14 @@ public class ExecutorControllerTest extends AbstractControllerTest {
}
@Test
public void testStartCheckProcessDefinition() throws Exception {
public void testStartCheck() throws Exception {
Map<String, Object> resultData = new HashMap<>();
resultData.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(executorService.startCheckByProcessDefinedCode(Mockito.anyLong())).thenReturn(resultData);
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/executors/start-check", "cxc_1113")
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-check", 1L)
.header(SESSION_ID, sessionId)
.param("processDefinitionId", "40"))
.param("processDefinitionCode", "1"))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();

309
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java

@ -1,309 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
* executor service 2 test
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class ExecutorService2Test {
@InjectMocks
private ExecutorServiceImpl executorService;
@Mock
private ProcessService processService;
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProjectServiceImpl projectService;
@Mock
private MonitorService monitorService;
private int processDefinitionId = 1;
private int processInstanceId = 1;
private int tenantId = 1;
private int userId = 1;
private ProcessDefinition processDefinition = new ProcessDefinition();
private ProcessInstance processInstance = new ProcessInstance();
private User loginUser = new User();
private String projectName = "projectName";
private Project project = new Project();
private String cronTime;
@Before
public void init() {
// user
loginUser.setId(userId);
// processDefinition
processDefinition.setId(processDefinitionId);
processDefinition.setReleaseState(ReleaseState.ONLINE);
processDefinition.setTenantId(tenantId);
processDefinition.setUserId(userId);
processDefinition.setVersion(1);
processDefinition.setCode(1L);
// processInstance
processInstance.setId(processInstanceId);
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setExecutorId(userId);
processInstance.setTenantId(tenantId);
processInstance.setProcessDefinitionVersion(1);
processInstance.setProcessDefinitionCode(1L);
// project
project.setName(projectName);
// cronRangeTime
cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00";
// mock
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkProjectAndAuth());
Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition);
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
}
/**
* not complement
*/
@Test
public void testNoComplement() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.START_PROCESS,
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}
/**
* not complement
*/
@Test
public void testComplementWithStartNodeList() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.START_PROCESS,
null, "n1,n2",
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}
/**
* date error
*/
@Test
public void testDateError() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
verify(processService, times(0)).createCommand(any(Command.class));
}
/**
* serial
*/
@Test
public void testSerial() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}
/**
* without schedule
*/
@Test
public void testParallelWithOutSchedule() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(31)).createCommand(any(Command.class));
}
/**
* with schedule
*/
@Test
public void testParallelWithSchedule() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(15)).createCommand(any(Command.class));
}
@Test
public void testNoMsterServers() {
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(new ArrayList<>());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS);
}
@Test
public void testExecuteRepeatRunning() {
Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
Map<String, Object> result = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.REPEAT_RUNNING);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
private List<Server> getMasterServersList() {
List<Server> masterServerList = new ArrayList<>();
Server masterServer1 = new Server();
masterServer1.setId(1);
masterServer1.setHost("192.168.220.188");
masterServer1.setPort(1121);
masterServerList.add(masterServer1);
Server masterServer2 = new Server();
masterServer2.setId(2);
masterServer2.setHost("192.168.220.189");
masterServer2.setPort(1122);
masterServerList.add(masterServer2);
return masterServerList;
}
private List zeroSchedulerList() {
return Collections.EMPTY_LIST;
}
private List<Schedule> oneSchedulerList() {
List<Schedule> schedulerList = new LinkedList<>();
Schedule schedule = new Schedule();
schedule.setCrontab("0 0 0 1/2 * ?");
schedulerList.add(schedule);
return schedulerList;
}
private Map<String, Object> checkProjectAndAuth() {
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}
}

315
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -17,52 +17,313 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.controller.AbstractControllerTest;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
* executor service test
* executor service 2 test
*/
public class ExecutorServiceTest extends AbstractControllerTest {
private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTest.class);
@RunWith(MockitoJUnitRunner.Silent.class)
public class ExecutorServiceTest {
@Autowired
@InjectMocks
private ExecutorServiceImpl executorService;
@Ignore
@Mock
private ProcessService processService;
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProjectServiceImpl projectService;
@Mock
private MonitorService monitorService;
private int processDefinitionId = 1;
private long processDefinitionCode = 1L;
private int processInstanceId = 1;
private int tenantId = 1;
private int userId = 1;
private ProcessDefinition processDefinition = new ProcessDefinition();
private ProcessInstance processInstance = new ProcessInstance();
private User loginUser = new User();
private long projectCode = 1L;
private String projectName = "projectName";
private Project project = new Project();
private String cronTime;
@Before
public void init() {
// user
loginUser.setId(userId);
// processDefinition
processDefinition.setId(processDefinitionId);
processDefinition.setReleaseState(ReleaseState.ONLINE);
processDefinition.setTenantId(tenantId);
processDefinition.setUserId(userId);
processDefinition.setVersion(1);
processDefinition.setCode(1L);
// processInstance
processInstance.setId(processInstanceId);
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setExecutorId(userId);
processInstance.setTenantId(tenantId);
processInstance.setProcessDefinitionVersion(1);
processInstance.setProcessDefinitionCode(1L);
// project
project.setCode(projectCode);
project.setName(projectName);
// cronRangeTime
cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00";
// mock
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkProjectAndAuth());
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
}
/**
* not complement
*/
@Test
public void testNoComplement() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.START_PROCESS,
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}
/**
* not complement
*/
@Test
public void testComplementWithStartNodeList() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.START_PROCESS,
null, "n1,n2",
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}
/**
* date error
*/
@Test
public void testDateError() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
verify(processService, times(0)).createCommand(any(Command.class));
}
/**
* serial
*/
@Test
public void testSerial() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}
/**
* without schedule
*/
@Test
public void startCheckByProcessDefinedId() {
Map<String, Object> map = executorService.startCheckByProcessDefinedId(1234);
Assert.assertNull(map);
public void testParallelWithOutSchedule() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(31)).createCommand(any(Command.class));
}
/**
* with schedule
*/
@Test
public void putMsgWithParamsTest() {
Map<String, Object> map = new HashMap<>();
putMsgWithParams(map, Status.PROJECT_ALREADY_EXISTS);
logger.info(map.toString());
public void testParallelWithSchedule() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(15)).createCommand(any(Command.class));
}
@Test
public void testNoMasterServers() {
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(new ArrayList<>());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS);
}
@Test
public void testExecuteRepeatRunning() {
Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
Map<String, Object> result = executorService.execute(loginUser, projectCode, processInstanceId, ExecuteType.REPEAT_RUNNING);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
public void testStartCheckByProcessDefinedCode() {
List<Integer> ids = new ArrayList<>();
ids.add(1);
Mockito.doNothing().when(processService).recurseFindSubProcessId(1, ids);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(1);
processDefinition.setReleaseState(ReleaseState.ONLINE);
processDefinitionList.add(processDefinition);
Mockito.when(processDefinitionMapper.queryDefinitionListByIdList(new Integer[ids.size()]))
.thenReturn(processDefinitionList);
Map<String, Object> result = executorService.startCheckByProcessDefinedCode(1L);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
private List<Server> getMasterServersList() {
List<Server> masterServerList = new ArrayList<>();
Server masterServer1 = new Server();
masterServer1.setId(1);
masterServer1.setHost("192.168.220.188");
masterServer1.setPort(1121);
masterServerList.add(masterServer1);
Server masterServer2 = new Server();
masterServer2.setId(2);
masterServer2.setHost("192.168.220.189");
masterServer2.setPort(1122);
masterServerList.add(masterServer2);
return masterServerList;
}
private List zeroSchedulerList() {
return Collections.EMPTY_LIST;
}
private List<Schedule> oneSchedulerList() {
List<Schedule> schedulerList = new LinkedList<>();
Schedule schedule = new Schedule();
schedule.setCrontab("0 0 0 1/2 * ?");
schedulerList.add(schedule);
return schedulerList;
}
void putMsgWithParams(Map<String, Object> result, Status status, Object... statusParams) {
result.put(Constants.STATUS, status);
if (statusParams != null && statusParams.length > 0) {
result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams));
} else {
result.put(Constants.MSG, status.getMsg());
}
private Map<String, Object> checkProjectAndAuth() {
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}
}
}

2
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue

@ -210,7 +210,7 @@
*/
_start (item) {
this.getWorkerGroupsAll()
this.getStartCheck({ processDefinitionId: item.id }).then(res => {
this.getStartCheck({ processDefinitionCode: item.code }).then(res => {
this.startData = item
this.startDialog = true
}).catch(e => {

6
dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js

@ -124,7 +124,7 @@ export default {
*/
editExecutorsState ({ state }, payload) {
return new Promise((resolve, reject) => {
io.post(`projects/${state.projectName}/executors/execute`, {
io.post(`projects/${state.projectCode}/executors/execute`, {
processInstanceId: payload.processInstanceId,
executeType: payload.executeType
}, res => {
@ -506,7 +506,7 @@ export default {
*/
processStart ({ state }, payload) {
return new Promise((resolve, reject) => {
io.post(`projects/${state.projectName}/executors/start-process-instance`, payload, res => {
io.post(`projects/${state.projectCode}/executors/start-process-instance`, payload, res => {
resolve(res)
}).catch(e => {
reject(e)
@ -543,7 +543,7 @@ export default {
*/
getStartCheck ({ state }, payload) {
return new Promise((resolve, reject) => {
io.post(`projects/${state.projectName}/executors/start-check`, payload, res => {
io.post(`projects/${state.projectCode}/executors/start-check`, payload, res => {
resolve(res)
}).catch(e => {
reject(e)

Loading…
Cancel
Save