From 40e00585aa0fb0496b600788062e44a4d29a01dc Mon Sep 17 00:00:00 2001 From: wind Date: Tue, 30 Nov 2021 17:20:57 +0800 Subject: [PATCH] [cherry-pick][DS-6849] add delete command check (#7074) * delete command with check * add test Co-authored-by: caishunfeng <534328519@qq.com> --- .../service/process/ProcessService.java | 173 ++++++++++++------ .../service/process/ProcessServiceTest.java | 157 +++++++++------- 2 files changed, 207 insertions(+), 123 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 198af0b33b..a1ebe3a92d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -17,13 +17,29 @@ package org.apache.dolphinscheduler.service.process; -import com.facebook.presto.jdbc.internal.guava.collect.Lists; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; + +import static java.util.stream.Collectors.toSet; + import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.Direct; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; +import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -34,27 +50,85 @@ import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; -import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.DagData; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.Environment; +import org.apache.dolphinscheduler.dao.entity.ErrorCommand; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.UdfFunc; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.CommandMapper; +import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; +import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper; +import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; +import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; +import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.spi.enums.ResourceType; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import java.util.*; -import java.util.Map.Entry; -import java.util.stream.Collectors; - -import static java.util.stream.Collectors.toSet; -import static org.apache.dolphinscheduler.common.Constants.*; +import com.facebook.presto.jdbc.internal.guava.collect.Lists; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * process relative dao that some mappers in this. @@ -65,10 +139,10 @@ public class ProcessService { private final Logger logger = LoggerFactory.getLogger(getClass()); private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), - ExecutionStatus.RUNNING_EXECUTION.ordinal(), - ExecutionStatus.DELAY_EXECUTION.ordinal(), - ExecutionStatus.READY_PAUSE.ordinal(), - ExecutionStatus.READY_STOP.ordinal()}; + ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.DELAY_EXECUTION.ordinal(), + ExecutionStatus.READY_PAUSE.ordinal(), + ExecutionStatus.READY_STOP.ordinal()}; @Autowired private UserMapper userMapper; @@ -136,7 +210,6 @@ public class ProcessService { * @param logger logger * @param host host * @param command found command - * @param processDefinitionCacheMaps * @return process instance */ @Transactional @@ -152,7 +225,7 @@ public class ProcessService { processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); - this.commandMapper.deleteById(command.getId()); + this.deleteCommandWithCheck(command.getId()); return processInstance; } @@ -202,10 +275,6 @@ public class ProcessService { /** * get command page - * - * @param pageSize - * @param pageNumber - * @return */ public List findCommandPage(int pageSize, int pageNumber) { return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize); @@ -426,21 +495,21 @@ public class ProcessService { // process instance quit by "waiting thread" state if (originCommand == null) { Command command = new Command( - CommandType.RECOVER_WAITING_THREAD, - processInstance.getTaskDependType(), - processInstance.getFailureStrategy(), - processInstance.getExecutorId(), - processInstance.getProcessDefinition().getCode(), - JSONUtils.toJsonString(cmdParam), - processInstance.getWarningType(), - processInstance.getWarningGroupId(), - processInstance.getScheduleTime(), - processInstance.getWorkerGroup(), - processInstance.getEnvironmentCode(), - processInstance.getProcessInstancePriority(), - processInstance.getDryRun(), - processInstance.getId(), - processInstance.getProcessDefinitionVersion() + CommandType.RECOVER_WAITING_THREAD, + processInstance.getTaskDependType(), + processInstance.getFailureStrategy(), + processInstance.getExecutorId(), + processInstance.getProcessDefinition().getCode(), + JSONUtils.toJsonString(cmdParam), + processInstance.getWarningType(), + processInstance.getWarningGroupId(), + processInstance.getScheduleTime(), + processInstance.getWorkerGroup(), + processInstance.getEnvironmentCode(), + processInstance.getProcessInstancePriority(), + processInstance.getDryRun(), + processInstance.getId(), + processInstance.getProcessDefinitionVersion() ); saveCommand(command); return; @@ -532,10 +601,10 @@ public class ProcessService { // curing global params processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - getCommandTypeIfComplement(processInstance, command), - processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + getCommandTypeIfComplement(processInstance, command), + processInstance.getScheduleTime())); // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); @@ -947,11 +1016,6 @@ public class ProcessService { /** * retry submit task to db - * - * @param taskInstance - * @param commitRetryTimes - * @param commitInterval - * @return */ public TaskInstance submitTask(TaskInstance taskInstance, int commitRetryTimes, int commitInterval) { @@ -1672,8 +1736,6 @@ public class ProcessService { /** * for show in page of taskInstance - * - * @param taskInstance */ public void changeOutParam(TaskInstance taskInstance) { if (StringUtils.isEmpty(taskInstance.getVarPool())) { @@ -1786,7 +1848,7 @@ public class ProcessService { */ public List queryNeedFailoverTaskInstances(String host) { return taskInstanceMapper.queryByHostAndStatus(host, - stateArray); + stateArray); } /** @@ -2401,4 +2463,11 @@ public class ProcessService { } return processTaskMap; } + + private void deleteCommandWithCheck(int commandId) { + int delete = this.commandMapper.deleteById(commandId); + if (delete != 1) { + throw new ServiceException("delete command fail, id:" + commandId); + } + } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 45b195239f..da7146085f 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.process; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; + import static org.mockito.ArgumentMatchers.any; import org.apache.dolphinscheduler.common.Constants; @@ -245,7 +246,7 @@ public class ProcessServiceTest { command.setProcessDefinitionCode(222); command.setCommandType(CommandType.REPEAT_RUNNING); command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\"" - + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}"); + + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}"); Assert.assertNull(processService.handleCommand(logger, host, command, processDefinitionCacheMaps)); int definitionVersion = 1; @@ -253,10 +254,13 @@ public class ProcessServiceTest { int processInstanceId = 222; //there is not enough thread for this command Command command1 = new Command(); + command1.setId(1); command1.setProcessDefinitionCode(definitionCode); command1.setProcessDefinitionVersion(definitionVersion); command1.setCommandParam("{\"ProcessInstanceId\":222}"); command1.setCommandType(CommandType.START_PROCESS); + Mockito.when(commandMapper.deleteById(1)).thenReturn(1); + ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setId(123); processDefinition.setName("test"); @@ -268,36 +272,45 @@ public class ProcessServiceTest { processInstance.setProcessDefinitionCode(definitionCode); processInstance.setProcessDefinitionVersion(definitionVersion); Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); + processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps)); Command command2 = new Command(); + command2.setId(2); command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}"); command2.setProcessDefinitionCode(definitionCode); command2.setProcessDefinitionVersion(definitionVersion); command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS); command2.setProcessInstanceId(processInstanceId); + Mockito.when(commandMapper.deleteById(2)).thenReturn(1); Assert.assertNotNull(processService.handleCommand(logger, host, command2, processDefinitionCacheMaps)); Command command3 = new Command(); + command3.setId(3); command3.setProcessDefinitionCode(definitionCode); command3.setProcessDefinitionVersion(definitionVersion); command3.setProcessInstanceId(processInstanceId); command3.setCommandParam("{\"WaitingThreadInstanceId\":222}"); command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + Mockito.when(commandMapper.deleteById(3)).thenReturn(1); + Assert.assertNotNull(processService.handleCommand(logger, host, command3, processDefinitionCacheMaps)); Command command4 = new Command(); + command4.setId(4); command4.setProcessDefinitionCode(definitionCode); command4.setProcessDefinitionVersion(definitionVersion); command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}"); command4.setCommandType(CommandType.REPEAT_RUNNING); command4.setProcessInstanceId(processInstanceId); + Mockito.when(commandMapper.deleteById(4)).thenReturn(1); + Assert.assertNotNull(processService.handleCommand(logger, host, command4, processDefinitionCacheMaps)); Command command5 = new Command(); + command5.setId(5); command5.setProcessDefinitionCode(definitionCode); command5.setProcessDefinitionVersion(definitionVersion); HashMap startParams = new HashMap<>(); @@ -307,6 +320,8 @@ public class ProcessServiceTest { command5.setCommandParam(JSONUtils.toJsonString(commandParams)); command5.setCommandType(CommandType.START_PROCESS); command5.setDryRun(Constants.DRY_RUN_FLAG_NO); + Mockito.when(commandMapper.deleteById(5)).thenReturn(1); + ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps); Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\"")); } @@ -389,14 +404,14 @@ public class ProcessServiceTest { operator.setUserType(UserType.GENERAL_USER); long projectCode = 751485690568704L; String taskJson = "[{\"code\":751500437479424,\"name\":\"aa\",\"version\":1,\"description\":\"\",\"delayTime\":0," - + "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"sleep 1s\\necho 11\"," - + "\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"waitStartTimeout\":{}}," - + "\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"yarn\",\"failRetryTimes\":0,\"failRetryInterval\":1," - + "\"timeoutFlag\":\"OPEN\",\"timeoutNotifyStrategy\":\"FAILED\",\"timeout\":1,\"environmentCode\":751496815697920}," - + "{\"code\":751516889636864,\"name\":\"bb\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[]," - + "\"localParams\":[],\"rawScript\":\"echo 22\",\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]}," - + "\"waitStartTimeout\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\"," - + "\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":0,\"delayTime\":\"0\",\"environmentCode\":-1}]"; + + "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"sleep 1s\\necho 11\"," + + "\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"waitStartTimeout\":{}}," + + "\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"yarn\",\"failRetryTimes\":0,\"failRetryInterval\":1," + + "\"timeoutFlag\":\"OPEN\",\"timeoutNotifyStrategy\":\"FAILED\",\"timeout\":1,\"environmentCode\":751496815697920}," + + "{\"code\":751516889636864,\"name\":\"bb\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[]," + + "\"localParams\":[],\"rawScript\":\"echo 22\",\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]}," + + "\"waitStartTimeout\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\"," + + "\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":0,\"delayTime\":\"0\",\"environmentCode\":-1}]"; List taskDefinitionLogs = JSONUtils.toList(taskJson, TaskDefinitionLog.class); TaskDefinitionLog taskDefinition = new TaskDefinitionLog(); taskDefinition.setCode(751500437479424L); @@ -487,10 +502,10 @@ public class ProcessServiceTest { processInstance.setId(62); taskInstance.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"\"}]"); taskInstance.setTaskParams("{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select id from tb_test limit 1\"," - + "\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\"," - + "\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}]," - + "\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"]," - + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}"); + + "\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\"," + + "\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}]," + + "\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"]," + + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}"); processService.changeOutParam(taskInstance); } @@ -498,65 +513,65 @@ public class ProcessServiceTest { public void testUpdateTaskDefinitionResources() throws Exception { TaskDefinition taskDefinition = new TaskDefinition(); String taskParameters = "{\n" - + " \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n" - + " \"mainJar\": {\n" - + " \"id\": 1\n" - + " },\n" - + " \"deployMode\": \"cluster\",\n" - + " \"resourceList\": [\n" - + " {\n" - + " \"id\": 3\n" - + " },\n" - + " {\n" - + " \"id\": 4\n" - + " }\n" - + " ],\n" - + " \"localParams\": [],\n" - + " \"driverCores\": 1,\n" - + " \"driverMemory\": \"512M\",\n" - + " \"numExecutors\": 2,\n" - + " \"executorMemory\": \"2G\",\n" - + " \"executorCores\": 2,\n" - + " \"appName\": \"\",\n" - + " \"mainArgs\": \"\",\n" - + " \"others\": \"\",\n" - + " \"programType\": \"JAVA\",\n" - + " \"sparkVersion\": \"SPARK2\",\n" - + " \"dependence\": {},\n" - + " \"conditionResult\": {\n" - + " \"successNode\": [\n" - + " \"\"\n" - + " ],\n" - + " \"failedNode\": [\n" - + " \"\"\n" - + " ]\n" - + " },\n" - + " \"waitStartTimeout\": {}\n" - + "}"; + + " \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n" + + " \"mainJar\": {\n" + + " \"id\": 1\n" + + " },\n" + + " \"deployMode\": \"cluster\",\n" + + " \"resourceList\": [\n" + + " {\n" + + " \"id\": 3\n" + + " },\n" + + " {\n" + + " \"id\": 4\n" + + " }\n" + + " ],\n" + + " \"localParams\": [],\n" + + " \"driverCores\": 1,\n" + + " \"driverMemory\": \"512M\",\n" + + " \"numExecutors\": 2,\n" + + " \"executorMemory\": \"2G\",\n" + + " \"executorCores\": 2,\n" + + " \"appName\": \"\",\n" + + " \"mainArgs\": \"\",\n" + + " \"others\": \"\",\n" + + " \"programType\": \"JAVA\",\n" + + " \"sparkVersion\": \"SPARK2\",\n" + + " \"dependence\": {},\n" + + " \"conditionResult\": {\n" + + " \"successNode\": [\n" + + " \"\"\n" + + " ],\n" + + " \"failedNode\": [\n" + + " \"\"\n" + + " ]\n" + + " },\n" + + " \"waitStartTimeout\": {}\n" + + "}"; taskDefinition.setTaskParams(taskParameters); Map resourceMap = - Stream.of(1, 3, 4) - .map(i -> { - Resource resource = new Resource(); - resource.setId(i); - resource.setFileName("file" + i); - resource.setFullName("/file" + i); - return resource; - }) - .collect( - Collectors.toMap( - Resource::getId, - resource -> resource) - ); + Stream.of(1, 3, 4) + .map(i -> { + Resource resource = new Resource(); + resource.setId(i); + resource.setFileName("file" + i); + resource.setFullName("/file" + i); + return resource; + }) + .collect( + Collectors.toMap( + Resource::getId, + resource -> resource) + ); for (Integer integer : Arrays.asList(1, 3, 4)) { Mockito.when(resourceMapper.selectById(integer)) - .thenReturn(resourceMap.get(integer)); + .thenReturn(resourceMap.get(integer)); } Whitebox.invokeMethod(processService, - "updateTaskDefinitionResources", - taskDefinition); + "updateTaskDefinitionResources", + taskDefinition); String taskParams = taskDefinition.getTaskParams(); SparkParameters sparkParameters = JSONUtils.parseObject(taskParams, SparkParameters.class); @@ -582,15 +597,15 @@ public class ProcessServiceTest { // test if input is null ResourceInfo resourceInfoNull = null; ResourceInfo updatedResourceInfo1 = Whitebox.invokeMethod(processService, - "updateResourceInfo", - resourceInfoNull); + "updateResourceInfo", + resourceInfoNull); Assert.assertNull(updatedResourceInfo1); // test if resource id less than 1 ResourceInfo resourceInfoVoid = new ResourceInfo(); ResourceInfo updatedResourceInfo2 = Whitebox.invokeMethod(processService, - "updateResourceInfo", - resourceInfoVoid); + "updateResourceInfo", + resourceInfoVoid); Assert.assertNull(updatedResourceInfo2); // test normal situation @@ -602,8 +617,8 @@ public class ProcessServiceTest { resource.setFullName("/test.txt"); Mockito.when(resourceMapper.selectById(1)).thenReturn(resource); ResourceInfo updatedResourceInfo3 = Whitebox.invokeMethod(processService, - "updateResourceInfo", - resourceInfoNormal); + "updateResourceInfo", + resourceInfoNormal); Assert.assertEquals(1, updatedResourceInfo3.getId()); Assert.assertEquals("test.txt", updatedResourceInfo3.getRes());