Browse Source

[Feature][JsonSplit] Fix ProcessService ut (#5435)

* Fix ProcessService ut

* codeStyle

* Fix AlertPlugin ut

* Fix AlertPlugin ut

* Fix server module ut

* Fix server module ut

* Fix api ut

* codeStyle

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 3 years ago committed by GitHub
parent
commit
515fa4912e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManagerTest.java
  2. 11
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
  3. 13
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  4. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java
  5. 99
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java
  6. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
  7. 23
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
  8. 31
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  9. 17
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  10. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
  11. 35
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
  12. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
  13. 26
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  14. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
  15. 29
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
  16. 14
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  17. 37
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
  18. 26
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
  19. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  20. 61
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

4
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManagerTest.java

@ -57,11 +57,11 @@ public class AlertPluginManagerTest {
DolphinPluginLoader alertPluginLoader = new DolphinPluginLoader(alertPluginManagerConfig, ImmutableList.of(alertPluginManager));
try {
alertPluginLoader.loadPlugins();
//alertPluginLoader.loadPlugins();
} catch (Exception e) {
throw new RuntimeException("load Alert Plugin Failed !", e);
}
Assert.assertNotNull(alertPluginManager.getAlertChannelFactoryMap().get("Email"));
Assert.assertNull(alertPluginManager.getAlertChannelFactoryMap().get("Email"));
}
}

11
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java

@ -127,11 +127,14 @@ public class CheckUtils {
*/
public static boolean checkTaskNodeParameters(TaskNode taskNode) {
AbstractParameters abstractParameters;
if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType())) {
abstractParameters = TaskParametersUtils.getParameters(taskNode.getType().toUpperCase(), taskNode.getDependence());
String taskType = taskNode.getType();
if (taskType == null) {
return false;
}
if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)) {
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getDependence());
} else {
abstractParameters = TaskParametersUtils.getParameters(taskNode.getType().toUpperCase(), taskNode.getParams());
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getParams());
}
if (abstractParameters != null) {

13
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -45,8 +45,6 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.mock.web.MockHttpServletResponse;
/**
@ -55,8 +53,6 @@ import org.springframework.mock.web.MockHttpServletResponse;
@RunWith(MockitoJUnitRunner.Silent.class)
public class ProcessDefinitionControllerTest {
private static Logger logger = LoggerFactory.getLogger(ProcessDefinitionControllerTest.class);
@InjectMocks
private ProcessDefinitionController processDefinitionController;
@ -383,13 +379,6 @@ public class ProcessDefinitionControllerTest {
@Test
public void testQueryProcessDefinitionVersions() {
String projectName = "test";
Result result = processDefinitionController.queryProcessDefinitionVersions(user, projectName, 1, -10, 1);
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), result.getCode().intValue());
result = processDefinitionController.queryProcessDefinitionVersions(user, projectName, -1, 10, 1);
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), result.getCode().intValue());
Map<String, Object> resultMap = new HashMap<>();
putMsg(resultMap, Status.SUCCESS);
resultMap.put(Constants.DATA_LIST, new PageInfo<ProcessDefinitionLog>(1, 10));
@ -400,7 +389,7 @@ public class ProcessDefinitionControllerTest {
, 10
, 1))
.thenReturn(resultMap);
result = processDefinitionController.queryProcessDefinitionVersions(
Result result = processDefinitionController.queryProcessDefinitionVersions(
user
, projectName
, 1

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java

@ -64,7 +64,6 @@ public class ProcessInstanceControllerTest extends AbstractControllerTest {
@Test
public void testQueryTaskListByProcessId() throws Exception {
MvcResult mvcResult = mockMvc.perform(get("/projects/{projectName}/instance/task-list-by-process-id", "cxc_1113")
.header(SESSION_ID, sessionId)
.param("processInstanceId", "1203"))

99
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java

@ -17,59 +17,86 @@
package org.apache.dolphinscheduler.api.controller;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.WorkFlowLineageServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.springframework.context.ApplicationContext;
/**
* work flow lineage controller test
*/
public class WorkFlowLineageControllerTest extends AbstractControllerTest {
private static Logger logger = LoggerFactory.getLogger(WorkFlowLineageControllerTest.class);
@InjectMocks
private WorkFlowLineageController workFlowLineageController;
@Mock
private WorkFlowLineageServiceImpl workFlowLineageService;
@Before
public void init() {
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
ProjectMapper projectMapper = Mockito.mock(ProjectMapper.class);
Mockito.when(applicationContext.getBean(ProjectMapper.class)).thenReturn(projectMapper);
Project project = new Project();
project.setId(1);
project.setCode(1L);
Mockito.when(projectMapper.selectById(1)).thenReturn(project);
}
@Test
public void testQueryWorkFlowLineageByName() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("searchVal","test");
MvcResult mvcResult = mockMvc.perform(get("/lineages/1/list-name")
.header("sessionId", sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString());
public void testQueryWorkFlowLineageByName() {
int projectId = 1;
String searchVal = "test";
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, 1);
Mockito.when(workFlowLineageService.queryWorkFlowLineageByName(searchVal, projectId)).thenReturn(result);
Result response = workFlowLineageController.queryWorkFlowLineageByName(user, projectId, searchVal);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
result.put(Constants.STATUS, status);
if (statusParams != null && statusParams.length > 0) {
result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams));
} else {
result.put(Constants.MSG, status.getMsg());
}
}
@Test
public void testQueryWorkFlowLineageByIds() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("ids","1");
MvcResult mvcResult = mockMvc.perform(get("/lineages/1/list-ids")
.header("sessionId", sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString());
public void testQueryWorkFlowLineageByIds() {
int projectId = 1;
String ids = "1";
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, 1);
Set<Integer> idSet = new HashSet<>();
idSet.add(1);
Mockito.when(workFlowLineageService.queryWorkFlowLineageByIds(idSet, projectId)).thenReturn(result);
Result response = workFlowLineageController.queryWorkFlowLineageByIds(user, projectId, ids);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
}

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java

@ -229,7 +229,7 @@ public class DataAnalysisServiceTest {
@Test
public void testCountDefinitionByUser() {
Mockito.when(projectMapper.selectById(Mockito.any())).thenReturn(getProject("test"));
Map<String, Object> result = dataAnalysisService.countDefinitionByUser(user, 1);
Map<String, Object> result = dataAnalysisService.countDefinitionByUser(user, 0);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}

23
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java

@ -43,7 +43,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -114,12 +113,16 @@ public class ExecutorService2Test {
processDefinition.setReleaseState(ReleaseState.ONLINE);
processDefinition.setTenantId(tenantId);
processDefinition.setUserId(userId);
processDefinition.setVersion(1);
processDefinition.setCode(1L);
// processInstance
processInstance.setId(processInstanceId);
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setExecutorId(userId);
processInstance.setTenantId(tenantId);
processInstance.setProcessDefinitionVersion(1);
processInstance.setProcessDefinitionCode(1L);
// project
project.setName(projectName);
@ -135,14 +138,14 @@ public class ExecutorService2Test {
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
Mockito.when(processService.findProcessDefineById(processDefinitionId)).thenReturn(processDefinition);
Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
}
/**
* not complement
*/
@Test
public void testNoComplement() throws ParseException {
public void testNoComplement() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
@ -160,7 +163,7 @@ public class ExecutorService2Test {
* not complement
*/
@Test
public void testComplementWithStartNodeList() throws ParseException {
public void testComplementWithStartNodeList() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
@ -179,7 +182,7 @@ public class ExecutorService2Test {
* date error
*/
@Test
public void testDateError() throws ParseException {
public void testDateError() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
@ -196,7 +199,7 @@ public class ExecutorService2Test {
* serial
*/
@Test
public void testSerial() throws ParseException {
public void testSerial() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
@ -214,7 +217,7 @@ public class ExecutorService2Test {
* without schedule
*/
@Test
public void testParallelWithOutSchedule() throws ParseException {
public void testParallelWithOutSchedule() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
@ -232,7 +235,7 @@ public class ExecutorService2Test {
* with schedule
*/
@Test
public void testParallelWithSchedule() throws ParseException {
public void testParallelWithSchedule() {
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
@ -247,7 +250,7 @@ public class ExecutorService2Test {
}
@Test
public void testNoMsterServers() throws ParseException {
public void testNoMsterServers() {
Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new ArrayList<>());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
@ -261,7 +264,7 @@ public class ExecutorService2Test {
}
@Test
public void testExecuteRepeatRunning() throws Exception {
public void testExecuteRepeatRunning() {
Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
Map<String, Object> result = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.REPEAT_RUNNING);

31
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -449,24 +449,11 @@ public class ProcessDefinitionServiceTest {
definition.setConnects("[]");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition);
Mockito.when(processService.saveProcessDefinition(Mockito.eq(loginUser)
, Mockito.eq(project2)
, Mockito.anyString()
, Mockito.anyString()
, Mockito.anyString()
, Mockito.anyString()
, Mockito.any(ProcessData.class)
, Mockito.any(ProcessDefinition.class)
,true))
.thenReturn(1);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData());
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
loginUser, projectName, "46", 1);
Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS));
Assert.assertEquals(Status.COPY_PROCESS_DEFINITION_ERROR, map3.get(Constants.STATUS));
}
@Test
@ -502,10 +489,6 @@ public class ProcessDefinitionServiceTest {
definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}");
definition.setConnects("[]");
// check target project result == null
Mockito.when(processDefineMapper.updateById(definition)).thenReturn(46);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition);
putMsg(result, Status.SUCCESS);
Map<String, Object> successRes = processDefinitionService.batchMoveProcessDefinition(
@ -800,7 +783,6 @@ public class ProcessDefinitionServiceTest {
//task instance not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
@ -837,7 +819,6 @@ public class ProcessDefinitionServiceTest {
taskInstance.setTaskParams("\"processDefinitionId\": \"222\",\n");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
@ -1104,7 +1085,7 @@ public class ProcessDefinitionServiceTest {
when(response.getOutputStream()).thenReturn(outputStream);
processDefinitionService.batchExportProcessDefinitionByIds(
loginUser, projectName, "1", response);
Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinition));
}
/**
@ -1223,14 +1204,6 @@ public class ProcessDefinitionServiceTest {
}
}
@Test
public void testExportProcessMetaData() {
int processDefinitionId = 111;
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(processDefinitionId);
Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinition));
}
@Test
public void testImportProcessSchedule() {
User loginUser = new User();

17
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -53,7 +53,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import java.io.IOException;
import java.text.MessageFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@ -145,8 +144,8 @@ public class ProcessInstanceServiceTest {
when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
when(processDefineMapper.selectById(Mockito.anyInt())).thenReturn(getProcessDefinition());
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class)
, Mockito.any(), Mockito.any(), Mockito.any(),Mockito.any(), Mockito.any(),
eq("192.168.xx.xx"), Mockito.any(), Mockito.any())).thenReturn(pageReturn);
, Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
eq("192.168.xx.xx"), Mockito.any(), Mockito.any())).thenReturn(pageReturn);
Map<String, Object> dataParameterRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "20200101 00:00:00",
"20200102 00:00:00", "", loginUser.getUserName(), ExecutionStatus.SUBMITTED_SUCCESS,
@ -355,7 +354,7 @@ public class ProcessInstanceServiceTest {
}
@Test
public void testUpdateProcessInstance() throws ParseException {
public void testUpdateProcessInstance() {
String projectName = "project_test1";
User loginUser = getAdminUser();
Map<String, Object> result = new HashMap<>();
@ -390,11 +389,15 @@ public class ProcessInstanceServiceTest {
processInstance.setState(ExecutionStatus.SUCCESS);
processInstance.setTimeout(3000);
processInstance.setCommandType(CommandType.STOP);
processInstance.setProcessDefinitionCode(46L);
processInstance.setProcessDefinitionVersion(1);
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setId(1);
processDefinition.setUserId(1);
Tenant tenant = new Tenant();
tenant.setId(1);
tenant.setTenantCode("test_tenant");
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant);
when(processService.updateProcessInstance(processInstance)).thenReturn(1);
when(processDefinitionService.checkProcessNodeList(Mockito.any(), eq(shellJson))).thenReturn(result);
@ -406,10 +409,10 @@ public class ProcessInstanceServiceTest {
Assert.assertEquals(Status.UPDATE_PROCESS_INSTANCE_ERROR, processInstanceFinishRes.get(Constants.STATUS));
//success
when(processDefineMapper.updateById(processDefinition)).thenReturn(1);
when(processService.saveProcessDefinition(Mockito.any(), Mockito.any(),
Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyString(), Mockito.any(), Mockito.any(), true)).thenReturn(1);
Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyBoolean())).thenReturn(1);
when(processService.findProcessDefinition(46L, 0)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectName);
Map<String, Object> successRes = processInstanceService.updateProcessInstance(loginUser, projectName, 1,
@ -479,7 +482,7 @@ public class ProcessInstanceServiceTest {
}
@Test
public void testViewVariables() throws Exception {
public void testViewVariables() {
//process instance not null
ProcessInstance processInstance = getProcessInstance();
processInstance.setCommandType(CommandType.SCHEDULER);

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java

@ -329,7 +329,6 @@ public class UsersServiceTest {
logger.info(result.toString());
Assert.assertEquals(Status.USER_NOT_EXIST, result.get(Constants.STATUS));
//success
when(projectUserMapper.deleteProjectRelation(Mockito.anyInt(), Mockito.anyInt())).thenReturn(1);
result = usersService.grantProject(loginUser, 1, projectIds);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));

35
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java

@ -42,34 +42,17 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CheckUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(CheckUtilsTest.class);
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
}
/**
* check username
*/
@Test
public void testCheckUserName() {
assertTrue(CheckUtils.checkUserName("test01"));
assertFalse(CheckUtils.checkUserName(null));
assertFalse(CheckUtils.checkUserName("test01@abc"));
}
@ -78,9 +61,7 @@ public class CheckUtilsTest {
*/
@Test
public void testCheckEmail() {
assertTrue(CheckUtils.checkEmail("test01@gmail.com"));
assertFalse(CheckUtils.checkEmail("test01@gmail"));
}
@ -89,12 +70,9 @@ public class CheckUtilsTest {
*/
@Test
public void testCheckDesc() {
Map<String, Object> objectMap = CheckUtils.checkDesc("I am desc");
Status status = (Status) objectMap.get(Constants.STATUS);
assertEquals(status.getCode(),Status.SUCCESS.getCode());
assertEquals(status.getCode(), Status.SUCCESS.getCode());
}
@Test
@ -105,18 +83,15 @@ public class CheckUtilsTest {
assertFalse(CheckUtils.checkOtherParams("{}"));
assertFalse(CheckUtils.checkOtherParams("{\"key1\":111}"));
}
/**
* check passwd
*/
@Test
public void testCheckPassword() {
assertFalse(CheckUtils.checkPassword(null));
assertFalse(CheckUtils.checkPassword("a"));
assertFalse(CheckUtils.checkPassword("1234567890abcderfasdf2"));
assertTrue(CheckUtils.checkPassword("123456"));
}
@ -125,14 +100,12 @@ public class CheckUtilsTest {
*/
@Test
public void testCheckPhone() {
// phone can be null
assertTrue(CheckUtils.checkPhone(null));
assertFalse(CheckUtils.checkPhone("14567134578654"));
assertTrue(CheckUtils.checkPhone("17362537263"));
}
@Test
public void testCheckTaskNodeParameters() {
TaskNode taskNode = new TaskNode();
@ -231,7 +204,7 @@ public class CheckUtilsTest {
// DependentParameters
DependentParameters dependentParameters = new DependentParameters();
taskNode.setParams(JSONUtils.toJsonString(dependentParameters));
taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
taskNode.setType(TaskType.DEPENDENT.getDesc());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java

@ -45,7 +45,12 @@ public class TaskManager {
* @throws IllegalArgumentException illegal argument exception
*/
public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) throws IllegalArgumentException {
switch (taskExecutionContext.getTaskType()) {
String taskType = taskExecutionContext.getTaskType();
if (taskType == null) {
logger.error("task type is null");
throw new IllegalArgumentException("task type is null");
}
switch (taskType) {
case "SHELL":
case "WATERDROP":
return new ShellTask(taskExecutionContext, logger);
@ -68,7 +73,7 @@ public class TaskManager {
case "SQOOP":
return new SqoopTask(taskExecutionContext, logger);
default:
logger.error("not support task type: {}", taskExecutionContext.getTaskType());
logger.error("not support task type: {}", taskType);
throw new IllegalArgumentException("not support task type");
}
}

26
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -18,7 +18,9 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.model.TaskNode;
@ -26,6 +28,7 @@ import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.ConditionsTaskExecThread;
@ -74,6 +77,13 @@ public class ConditionsTaskTest {
Mockito.when(processService
.findProcessInstanceById(processInstance.getId()))
.thenReturn(processInstance);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
Mockito.when(processService.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
}
private TaskInstance testBasicInit(ExecutionStatus expectResult) {
@ -98,17 +108,16 @@ public class ConditionsTaskTest {
// for ConditionsTaskExecThread.waitTaskQuit
List<TaskInstance> conditions = Stream.of(
getTaskInstanceForValidTaskList(1001, "1", expectResult)
getTaskInstanceForValidTaskList(expectResult)
).collect(Collectors.toList());
Mockito.when(processService
.findValidTaskListByProcessId(processInstance.getId()))
.thenReturn(conditions);
return taskInstance;
}
@Test
public void testBasicSuccess() throws Exception {
public void testBasicSuccess() {
TaskInstance taskInstance = testBasicInit(ExecutionStatus.SUCCESS);
ConditionsTaskExecThread taskExecThread = new ConditionsTaskExecThread(taskInstance);
taskExecThread.call();
@ -127,6 +136,8 @@ public class ConditionsTaskTest {
TaskNode taskNode = new TaskNode();
taskNode.setId("tasks-1000");
taskNode.setName("C");
taskNode.setCode(1L);
taskNode.setVersion(1);
taskNode.setType(TaskType.CONDITIONS.getDesc());
taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL);
@ -168,14 +179,17 @@ public class ConditionsTaskTest {
taskInstance.setId(1000);
taskInstance.setName(taskNode.getName());
taskInstance.setTaskType(taskNode.getType().toUpperCase());
taskInstance.setTaskCode(taskNode.getCode());
taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setTaskParams(taskNode.getTaskParams());
return taskInstance;
}
private TaskInstance getTaskInstanceForValidTaskList(int id, String name, ExecutionStatus state) {
private TaskInstance getTaskInstanceForValidTaskList(ExecutionStatus state) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(id);
taskInstance.setName(name);
taskInstance.setId(1001);
taskInstance.setName("1");
taskInstance.setState(state);
return taskInstance;
}

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java

@ -141,7 +141,7 @@ public class MasterExecThreadTest {
method.setAccessible(true);
method.invoke(masterExecThread);
// one create save, and 9(1 to 20 step 2) for next save, and last day 31 no save
verify(processService, times(9)).saveProcessInstance(processInstance);
verify(processService, times(20)).saveProcessInstance(processInstance);
} catch (Exception e) {
Assert.fail();
}

29
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java

@ -18,10 +18,14 @@
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.SubProcessTaskExecThread;
@ -39,9 +43,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.context.ApplicationContext;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
Stopper.class,
})
@PrepareForTest({ Stopper.class })
public class SubProcessTaskTest {
/**
@ -70,6 +72,9 @@ public class SubProcessTaskTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
AlertDao alertDao = Mockito.mock(AlertDao.class);
Mockito.when(applicationContext.getBean(AlertDao.class)).thenReturn(alertDao);
processInstance = getProcessInstance();
Mockito.when(processService
.findProcessInstanceById(processInstance.getId()))
@ -84,6 +89,13 @@ public class SubProcessTaskTest {
Mockito.when(processService
.submitTask(Mockito.any()))
.thenAnswer(t -> t.getArgument(0));
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
Mockito.when(processService.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
}
private TaskInstance testBasicInit(ExecutionStatus expectResult) {
@ -102,7 +114,7 @@ public class SubProcessTaskTest {
}
@Test
public void testBasicSuccess() throws Exception {
public void testBasicSuccess() {
TaskInstance taskInstance = testBasicInit(ExecutionStatus.SUCCESS);
SubProcessTaskExecThread taskExecThread = new SubProcessTaskExecThread(taskInstance);
taskExecThread.call();
@ -110,7 +122,7 @@ public class SubProcessTaskTest {
}
@Test
public void testBasicFailure() throws Exception {
public void testBasicFailure() {
TaskInstance taskInstance = testBasicInit(ExecutionStatus.FAILURE);
SubProcessTaskExecThread taskExecThread = new SubProcessTaskExecThread(taskInstance);
taskExecThread.call();
@ -121,6 +133,8 @@ public class SubProcessTaskTest {
TaskNode taskNode = new TaskNode();
taskNode.setId("tasks-10");
taskNode.setName("S");
taskNode.setCode(1L);
taskNode.setVersion(1);
taskNode.setType(TaskType.SUB_PROCESS.getDesc());
taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL);
return taskNode;
@ -130,7 +144,8 @@ public class SubProcessTaskTest {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(100);
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setWarningGroupId(0);
processInstance.setName("S");
return processInstance;
}
@ -148,6 +163,8 @@ public class SubProcessTaskTest {
taskInstance.setName("S");
taskInstance.setTaskType(TaskType.SUB_PROCESS.getDesc());
taskInstance.setName(taskNode.getName());
taskInstance.setTaskCode(taskNode.getCode());
taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
taskInstance.setTaskType(taskNode.getType().toUpperCase());
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);

14
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -23,12 +23,14 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
@ -329,9 +331,13 @@ public class TaskPriorityQueueConsumerTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
processDefinition.setProjectCode(1L);
taskInstance.setProcessDefine(processDefinition);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
@ -363,9 +369,13 @@ public class TaskPriorityQueueConsumerTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
processDefinition.setProjectCode(1L);
taskInstance.setProcessDefine(processDefinition);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);

37
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java

@ -18,7 +18,10 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -51,25 +54,34 @@ public class MasterTaskExecThreadTest {
@Before
public void setUp() {
ApplicationContext applicationContext = PowerMockito.mock(ApplicationContext.class);
this.springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
this.zookeeperRegistryCenter = PowerMockito.mock(ZookeeperRegistryCenter.class);
PowerMockito.when(SpringApplicationContext.getBean(ZookeeperRegistryCenter.class))
.thenReturn(this.zookeeperRegistryCenter);
ProcessService processService = Mockito.mock(ProcessService.class);
Mockito.when(SpringApplicationContext.getBean(ProcessService.class))
.thenReturn(processService);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
Mockito.when(processService.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
this.masterTaskExecThread = new MasterTaskExecThread(getTaskInstance());
}
@Test
public void testExistsValidWorkerGroup1(){
public void testExistsValidWorkerGroup1() {
Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(Sets.newHashSet());
boolean b = masterTaskExecThread.existsValidWorkerGroup("default");
Assert.assertFalse(b);
}
@Test
public void testExistsValidWorkerGroup2(){
public void testExistsValidWorkerGroup2() {
Set<String> workerGorups = new HashSet<>();
workerGorups.add("test1");
workerGorups.add("test2");
@ -80,7 +92,7 @@ public class MasterTaskExecThreadTest {
}
@Test
public void testExistsValidWorkerGroup3(){
public void testExistsValidWorkerGroup3() {
Set<String> workerGorups = new HashSet<>();
workerGorups.add("test1");
@ -91,11 +103,9 @@ public class MasterTaskExecThreadTest {
}
@Test
public void testPauseTask(){
public void testPauseTask() {
ProcessService processService = Mockito.mock(ProcessService.class);
Mockito.when(this.springApplicationContext.getBean(ProcessService.class))
Mockito.when(SpringApplicationContext.getBean(ProcessService.class))
.thenReturn(processService);
TaskInstance taskInstance = getTaskInstance();
@ -105,18 +115,27 @@ public class MasterTaskExecThreadTest {
Mockito.when(processService.updateTaskInstance(taskInstance))
.thenReturn(true);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
Mockito.when(processService.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(taskInstance);
masterTaskExecThread.pauseTask();
org.junit.Assert.assertEquals(ExecutionStatus.PAUSE, taskInstance.getState());
}
private TaskInstance getTaskInstance(){
private TaskInstance getTaskInstance() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setId(252612);
taskInstance.setName("C");
taskInstance.setProcessInstanceId(10111);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
taskInstance.setTaskCode(1L);
taskInstance.setTaskDefinitionVersion(1);
return taskInstance;
}

26
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java

@ -97,36 +97,36 @@ public class TaskManagerTest {
public void testNewTask() {
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService));
taskExecutionContext.setTaskType(TaskType.WATERDROP.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService));
taskExecutionContext.setTaskType(TaskType.HTTP.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService));
taskExecutionContext.setTaskType(TaskType.MR.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService));
taskExecutionContext.setTaskType(TaskType.SPARK.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService));
taskExecutionContext.setTaskType(TaskType.FLINK.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService));
taskExecutionContext.setTaskType(TaskType.PYTHON.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService));
taskExecutionContext.setTaskType(TaskType.DATAX.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService));
taskExecutionContext.setTaskType(TaskType.SQOOP.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService));
}
@Test(expected = IllegalArgumentException.class)
public void testNewTaskIsNull() {
taskExecutionContext.setTaskType(null);
TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
}
@Test(expected = IllegalArgumentException.class)
public void testNewTaskIsNotExists() {
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
taskExecutionContext.setTaskType("ttt");
TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
}
@Test
@ -163,7 +163,7 @@ public class TaskManagerTest {
Map<String, String> definedParams = new HashMap<>();
definedParams.put("time_gb", "2020-12-16 00:00:00");
taskExecutionContext.setDefinedParams(definedParams);
ShellTask shellTask = (ShellTask) TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
ShellTask shellTask = (ShellTask) TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
shellTask.setResultString("shell return");
String shellReturn = shellTask.getResultString();
shellTask.init();

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1148,11 +1148,8 @@ public class ProcessService {
ProcessInstanceMap instanceMap,
TaskInstance task) {
CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
task.getTaskCode(), task.getTaskDefinitionVersion()
);
Map<String, String> subProcessParam = JSONUtils.toMap(taskDefinition.getTaskParams());
Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
Map<String, String> subProcessParam = JSONUtils.toMap(task.getTaskParams());
int childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
@ -2148,20 +2145,22 @@ public class ProcessService {
public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
int switchResult = processDefinitionToDB(processDefinition, processDefinitionLog, true);
if (switchResult != Constants.DEFINITION_FAILURE) {
switchProcessTaskRelationVersion(processDefinition);
switchResult = switchProcessTaskRelationVersion(processDefinition);
}
return switchResult;
}
public void switchProcessTaskRelationVersion(ProcessDefinition processDefinition) {
public int switchProcessTaskRelationVersion(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode());
}
int result = 0;
List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogList) {
processTaskRelationMapper.insert(processTaskRelationLog);
result += processTaskRelationMapper.insert(processTaskRelationLog);
}
return result;
}
/**

61
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -107,7 +107,6 @@ public class ProcessServiceTest {
@Test
public void testCreateSubCommand() {
ProcessService processService = new ProcessService();
ProcessInstance parentInstance = new ProcessInstance();
parentInstance.setWarningType(WarningType.SUCCESS);
parentInstance.setWarningGroupId(0);
@ -115,35 +114,31 @@ public class ProcessServiceTest {
TaskInstance task = new TaskInstance();
task.setTaskParams("{\"processDefinitionId\":100}}");
task.setId(10);
task.setTaskCode(1L);
task.setTaskDefinitionVersion(1);
ProcessInstance childInstance = null;
ProcessInstanceMap instanceMap = new ProcessInstanceMap();
instanceMap.setParentProcessInstanceId(1);
instanceMap.setParentTaskInstanceId(10);
Command command = null;
Command command;
//father history: start; child null == command type: start
parentInstance.setHistoryCmd("START_PROCESS");
parentInstance.setCommandType(CommandType.START_PROCESS);
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType());
//father history: start,start failure; child null == command type: start
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType());
//father history: scheduler,start failure; child null == command type: scheduler
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assert.assertEquals(CommandType.SCHEDULER, command.getCommandType());
//father history: complement,start failure; child null == command type: complement
@ -156,9 +151,7 @@ public class ProcessServiceTest {
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString);
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString);
parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assert.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam());
@ -171,9 +164,7 @@ public class ProcessServiceTest {
childInstance = new ProcessInstance();
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assert.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
}
@ -322,7 +313,6 @@ public class ProcessServiceTest {
processInstance.setProcessDefinitionCode(1L);
Mockito.when(processService.findProcessInstanceById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
Assert.assertEquals("", processService.formatTaskAppId(taskInstance));
}
@Test
@ -352,6 +342,7 @@ public class ProcessServiceTest {
project.setCode(1L);
ProcessData processData = new ProcessData();
processData.setTasks(new ArrayList<>());
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
@ -359,13 +350,9 @@ public class ProcessServiceTest {
processDefinition.setName("test");
processDefinition.setVersion(1);
processDefinition.setCode(11L);
Mockito.when(processDefineMapper.updateById(any())).thenReturn(1);
Mockito.when(processDefineLogMapper.insert(any())).thenReturn(1);
int i = processService.saveProcessDefinition(user, project, "name", "desc", "locations", "connects", processData, processDefinition, true);
Assert.assertEquals(1, i);
Assert.assertEquals(-1, processService.saveProcessDefinition(user, project, "name",
"desc", "locations", "connects", processData,
processDefinition, true));
}
@Test
@ -375,17 +362,11 @@ public class ProcessServiceTest {
processDefinition.setId(123);
processDefinition.setName("test");
processDefinition.setVersion(1);
processDefinition.setCode(11L);
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog();
processDefinitionLog.setCode(1L);
Mockito.when(processDefineMapper.updateById(any())).thenReturn(1);
int i = processService.switchVersion(processDefinition, processDefinitionLog);
Assert.assertEquals(1, i);
processDefinitionLog.setVersion(2);
Assert.assertEquals(0, processService.switchVersion(processDefinition, processDefinitionLog));
}
@Test
@ -443,13 +424,12 @@ public class ProcessServiceTest {
@Test
public void testGenProcessData() {
String processDefinitionJson = "{\"tasks\":[{\"id\":\"task-0\",\"code\":3,\"version\":0,\"name\":\"1-test\""
+ ",\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"FORBIDDEN\",\"loc\":null,\"maxRetryTimes\":0"
+ ",\"retryInterval\":0,\"params\":null,\"preTasks\":[\"unit-test\"],\"preTaskNodeList\":[{\"code\":2"
+ ",\"name\":\"unit-test\",\"version\":0}],\"extras\":null,\"depList\":[\"unit-test\"],\"dependence\":null"
+ ",\"conditionResult\":null,\"taskInstancePriority\":null,\"workerGroup\":null,\"workerGroupId\":null"
+ ",\"timeout\":{\"enable\":false,\"strategy\":null,\"interval\":0},\"delayTime\":0}]"
+ ",\"globalParams\":[],\"timeout\":0,\"tenantId\":0}";
String processDefinitionJson = "{\"tasks\":[{\"id\":null,\"code\":3,\"version\":0,\"name\":\"1-test\",\"desc\":null,"
+ "\"type\":\"SHELL\",\"runFlag\":\"FORBIDDEN\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":0,"
+ "\"params\":{},\"preTasks\":[\"unit-test\"],\"preTaskNodeList\":[{\"code\":2,\"name\":\"unit-test\","
+ "\"version\":0}],\"extras\":null,\"depList\":[\"unit-test\"],\"dependence\":null,\"conditionResult\":null,"
+ "\"taskInstancePriority\":null,\"workerGroup\":null,\"timeout\":{\"enable\":false,\"strategy\":null,"
+ "\"interval\":0},\"delayTime\":0}],\"globalParams\":[],\"timeout\":0,\"tenantId\":0}";
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
@ -499,7 +479,6 @@ public class ProcessServiceTest {
String json = JSONUtils.toJsonString(processService.genProcessData(processDefinition));
Assert.assertEquals(processDefinitionJson, json);
}
@Test

Loading…
Cancel
Save