From 87ff13fc865ffb456e94aa197e367b72239249e8 Mon Sep 17 00:00:00 2001 From: wangxj3 <857234426@qq.com> Date: Sun, 7 Feb 2021 17:10:33 +0800 Subject: [PATCH] [Feature-#3805][server-master] global params of master (#4678) * global initParam and set Param * fix dataFormat error * fix deal outParams bug * fix code style * fix code style * fix code style * fix code style * fix code style * fix code style * fix code style * add test * fix code style (variable name) * fix reset globalParams bug Co-authored-by: wangxj --- .../dao/mapper/ProcessInstanceMapper.java | 3 + .../dao/mapper/ProcessInstanceMapper.xml | 6 +- .../command/TaskExecuteResponseCommand.java | 13 +- .../processor/TaskResponseProcessor.java | 4 +- .../processor/queue/TaskResponseEvent.java | 16 +- .../processor/queue/TaskResponseService.java | 3 +- .../master/runner/MasterExecThread.java | 51 ++++- .../queue/TaskResponseServiceTest.java | 3 +- .../service/process/ProcessService.java | 176 ++++++++++++------ .../service/process/ProcessServiceTest.java | 22 +++ 10 files changed, 231 insertions(+), 66 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index a9ebbf000c..7116dc4c90 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -221,4 +221,7 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("processDefinitionId") int processDefinitionId, @Param("states") int[] states); + int updateGlobalParamById( + @Param("globalParams") String globalParams, + @Param("id") int id); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index f66163541b..793b58e567 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -219,5 +219,9 @@ order by id asc - + + update t_ds_process_instance + set global_params = #{globalParams} + where id = #{id} + diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java index de5b82c729..21fe47198c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java @@ -68,7 +68,10 @@ public class TaskExecuteResponseCommand implements Serializable { * varPool string */ private String varPool; - + /** + * task return result + */ + private String result; public void setVarPool(String varPool) { this.varPool = varPool; } @@ -139,4 +142,12 @@ public class TaskExecuteResponseCommand implements Serializable { + ", appIds='" + appIds + '\'' + '}'; } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 080fdd540d..186c4f35ba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -80,7 +80,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor { responseCommand.getAppIds(), responseCommand.getTaskInstanceId(), responseCommand.getVarPool(), - channel); + channel, + responseCommand.getResult() + ); taskResponseService.addResponse(taskResponseEvent); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java index 0ca558a560..9789bccb3c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java @@ -92,6 +92,10 @@ public class TaskResponseEvent { * channel */ private Channel channel; + /** + * task return result + */ + private String result; public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, @@ -118,7 +122,8 @@ public class TaskResponseEvent { String appIds, int taskInstanceId, String varPool, - Channel channel) { + Channel channel, + String result) { TaskResponseEvent event = new TaskResponseEvent(); event.setState(state); event.setEndTime(endTime); @@ -128,6 +133,7 @@ public class TaskResponseEvent { event.setEvent(Event.RESULT); event.setVarPool(varPool); event.setChannel(channel); + event.setResult(result); return event; } @@ -226,4 +232,12 @@ public class TaskResponseEvent { public void setChannel(Channel channel) { this.channel = channel; } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 1b5eddbd6f..f3f2e7f15b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -165,7 +165,8 @@ public class TaskResponseService { taskResponseEvent.getProcessId(), taskResponseEvent.getAppIds(), taskResponseEvent.getTaskInstanceId(), - taskResponseEvent.getVarPool() + taskResponseEvent.getVarPool(), + taskResponseEvent.getResult() ); } // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 3b113b6536..0e6c0d8e21 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -22,11 +22,13 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; @@ -36,6 +38,7 @@ import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils; @@ -67,6 +70,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -74,6 +78,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -491,7 +496,8 @@ public class MasterExecThread implements Runnable { */ private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName, TaskNode taskNode) { - + //update processInstance for update the globalParams + this.processInstance = this.processService.findProcessInstanceById(this.processInstance.getId()); TaskInstance taskInstance = findTaskIfExists(nodeName); if (taskInstance == null) { taskInstance = new TaskInstance(); @@ -540,13 +546,53 @@ public class MasterExecThread implements Runnable { } else { taskInstance.setWorkerGroup(taskWorkerGroup); } - + //get process global + setProcessGlobal(taskNode, taskInstance); // delay execution time taskInstance.setDelayTime(taskNode.getDelayTime()); } return taskInstance; } + private void setProcessGlobal(TaskNode taskNode, TaskInstance taskInstance) { + String globalParams = this.processInstance.getGlobalParams(); + if (StringUtils.isNotEmpty(globalParams)) { + Map globalMap = getGlobalParamMap(globalParams); + if (globalMap != null) { + // the param save in localParams + Map result = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class); + Object localParams = result.get(LOCAL_PARAMS); + if (localParams != null) { + List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); + for (Property info : allParam) { + if (info.getDirect().equals(Direct.IN)) { + String paramName = info.getProp(); + String value = globalMap.get(paramName); + if (StringUtils.isNotEmpty(value)) { + info.setValue(value); + } + } + } + result.put(LOCAL_PARAMS, allParam); + taskNode.setParams(JSONUtils.toJsonString(result)); + // task instance node json + taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + } + } + } + } + + public Map getGlobalParamMap(String globalParams) { + List propList; + Map globalParamMap = new HashMap<>(); + if (StringUtils.isNotEmpty(globalParams)) { + propList = JSONUtils.toList(globalParams, Property.class); + globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); + } + + return globalParamMap; + } + private void submitPostNode(String parentNodeName) { Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList); List taskInstances = new ArrayList<>(); @@ -952,6 +998,7 @@ public class MasterExecThread implements Runnable { // node success , post node submit if (task.getState() == ExecutionStatus.SUCCESS) { processInstance.setVarPool(task.getVarPool()); + processInstance = processService.findProcessInstanceById(processInstance.getId()); processService.updateProcessInstance(processInstance); completeTaskList.put(task.getName(), task); submitPostNode(task.getName()); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 5d10f849c5..ec0807cbdd 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -70,7 +70,8 @@ public class TaskResponseServiceTest { "ids", 22, "varPol", - channel); + channel, + "[{\"id\":70000,\"database_name\":\"yuul\",\"status\":-1,\"create_time\":1601202829000,\"update_time\":1601202829000,\"table_name3\":\"\",\"table_name4\":\"\"}]"); taskInstance = new TaskInstance(); taskInstance.setId(22); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index b9065eca3e..d77a654b8c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -24,6 +24,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PRO import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS; import static java.util.stream.Collectors.toSet; @@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CycleEnum; +import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; @@ -89,6 +91,7 @@ import java.util.Date; import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -103,6 +106,8 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import com.cronutils.model.Cron; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; /** @@ -161,10 +166,10 @@ public class ProcessService { /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * - * @param logger logger - * @param host host + * @param logger logger + * @param host host * @param validThreadNum validThreadNum - * @param command found command + * @param command found command * @return process instance */ @Transactional(rollbackFor = Exception.class) @@ -204,7 +209,7 @@ public class ProcessService { /** * set process waiting thread * - * @param command command + * @param command command * @param processInstance processInstance * @return process instance */ @@ -222,7 +227,7 @@ public class ProcessService { /** * check thread num * - * @param command command + * @param command command * @param validThreadNum validThreadNum * @return if thread is enough */ @@ -425,7 +430,7 @@ public class ProcessService { * recursive query sub process definition id by parent id. * * @param parentId parentId - * @param ids ids + * @param ids ids */ public void recurseFindSubProcessId(int parentId, List ids) { ProcessDefinition processDefinition = processDefineMapper.selectById(parentId); @@ -456,7 +461,7 @@ public class ProcessService { * create recovery waiting thread command and delete origin command at the same time. * if the recovery command is exists, only update the field update_time * - * @param originCommand originCommand + * @param originCommand originCommand * @param processInstance processInstance */ public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) { @@ -508,7 +513,7 @@ public class ProcessService { /** * get schedule time from command * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return date */ @@ -524,8 +529,8 @@ public class ProcessService { * generate a new work process instance from command. * * @param processDefinition processDefinition - * @param command command - * @param cmdParam cmdParam map + * @param command command + * @param cmdParam cmdParam map * @return process instance */ private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition, @@ -601,7 +606,7 @@ public class ProcessService { * use definition creator's tenant. * * @param tenantId tenantId - * @param userId userId + * @param userId userId * @return tenant */ public Tenant getTenantForProcess(int tenantId, int userId) { @@ -624,7 +629,7 @@ public class ProcessService { /** * check command parameters is valid * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return whether command param is valid */ @@ -644,7 +649,7 @@ public class ProcessService { * construct process instance according to one command. * * @param command command - * @param host host + * @param host host * @return process instance */ private ProcessInstance constructProcessInstance(Command command, String host) { @@ -686,11 +691,6 @@ public class ProcessService { } else { processInstance = this.findProcessInstanceDetailById(processInstanceId); // Recalculate global parameters after rerun. - processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - getCommandTypeIfComplement(processInstance, command), - processInstance.getScheduleTime())); } processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId()); processInstance.setProcessDefinition(processDefinition); @@ -807,7 +807,7 @@ public class ProcessService { * return complement data if the process start with complement data * * @param processInstance processInstance - * @param command command + * @param command command * @return command type */ private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) { @@ -822,8 +822,8 @@ public class ProcessService { * initialize complement data parameters * * @param processDefinition processDefinition - * @param processInstance processInstance - * @param cmdParam cmdParam + * @param processInstance processInstance + * @param cmdParam cmdParam */ private void initComplementDataParam(ProcessDefinition processDefinition, ProcessInstance processInstance, @@ -895,7 +895,7 @@ public class ProcessService { * only the keys doesn't in sub process global would be joined. * * @param parentGlobalParams parentGlobalParams - * @param subGlobalParams subGlobalParams + * @param subGlobalParams subGlobalParams * @return global params join */ private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) { @@ -965,7 +965,7 @@ public class ProcessService { * set map {parent instance id, task instance id, 0(child instance id)} * * @param parentInstance parentInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { @@ -994,7 +994,7 @@ public class ProcessService { * find previous task work process map. * * @param parentProcessInstance parentProcessInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance, @@ -1020,7 +1020,7 @@ public class ProcessService { * create sub work process command * * @param parentProcessInstance parentProcessInstance - * @param task task + * @param task task */ public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) { if (!task.isSubProcess()) { @@ -1119,7 +1119,7 @@ public class ProcessService { * update sub process definition * * @param parentProcessInstance parentProcessInstance - * @param childDefinitionId childDefinitionId + * @param childDefinitionId childDefinitionId */ private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) { ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId()); @@ -1133,7 +1133,7 @@ public class ProcessService { /** * submit task to mysql * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstance processInstance * @return task instance */ @@ -1187,16 +1187,16 @@ public class ProcessService { * return stop if work process state is ready stop * if all of above are not satisfied, return submit success * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstanceState processInstanceState * @return process instance state */ public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) { ExecutionStatus state = taskInstance.getState(); + // running, delayed or killed + // the task already exists in task queue + // return state if ( - // running, delayed or killed - // the task already exists in task queue - // return state state == ExecutionStatus.RUNNING_EXECUTION || state == ExecutionStatus.DELAY_EXECUTION || state == ExecutionStatus.KILL @@ -1363,7 +1363,7 @@ public class ProcessService { * get id list by task state * * @param instanceId instanceId - * @param state state + * @param state state * @return task instance states */ public List findTaskIdByInstanceState(int instanceId, ExecutionStatus state) { @@ -1418,7 +1418,7 @@ public class ProcessService { * find work process map by parent process id and parent task id. * * @param parentWorkProcessId parentWorkProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance map */ public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { @@ -1440,7 +1440,7 @@ public class ProcessService { * find sub process instance * * @param parentProcessId parentProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance */ public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) { @@ -1472,12 +1472,12 @@ public class ProcessService { /** * change task state * - * @param state state - * @param startTime startTime - * @param host host + * @param state state + * @param startTime startTime + * @param host host * @param executePath executePath - * @param logPath logPath - * @param taskInstId taskInstId + * @param logPath logPath + * @param taskInstId taskInstId */ public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host, String executePath, @@ -1505,12 +1505,12 @@ public class ProcessService { * update the process instance * * @param processInstanceId processInstanceId - * @param processJson processJson - * @param globalParams globalParams - * @param scheduleTime scheduleTime - * @param flag flag - * @param locations locations - * @param connects connects + * @param processJson processJson + * @param globalParams globalParams + * @param scheduleTime scheduleTime + * @param flag flag + * @param locations locations + * @param connects connects * @return update process instance result */ public int updateProcessInstance(Integer processInstanceId, String processJson, @@ -1531,25 +1531,85 @@ public class ProcessService { /** * change task state * - * @param state state - * @param endTime endTime + * @param state state + * @param endTime endTime * @param taskInstId taskInstId - * @param varPool varPool + * @param varPool varPool */ public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstId, - String varPool) { + String varPool, + String result) { taskInstance.setPid(processId); taskInstance.setAppLink(appIds); taskInstance.setState(state); taskInstance.setEndTime(endTime); taskInstance.setVarPool(varPool); + changeOutParam(result, taskInstance); saveTaskInstance(taskInstance); } + public void changeOutParam(String result, TaskInstance taskInstance) { + if (StringUtils.isEmpty(result)) { + return; + } + List> workerResultParam = getListMapByString(result); + if (CollectionUtils.isEmpty(workerResultParam)) { + return; + } + //if the result more than one line,just get the first . + Map row = workerResultParam.get(0); + if (row == null || row.size() == 0) { + return; + } + TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class); + Map taskParams = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class); + Object localParams = taskParams.get(LOCAL_PARAMS); + if (localParams == null) { + return; + } + ProcessInstance processInstance = this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId()); + List params4Process = JSONUtils.toList(processInstance.getGlobalParams(), Property.class); + Map allParamMap = params4Process.stream().collect(Collectors.toMap(Property::getProp, Property -> Property)); + + List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); + for (Property info : allParam) { + if (info.getDirect() == Direct.OUT) { + String paramName = info.getProp(); + Property property = allParamMap.get(paramName); + if (property == null) { + continue; + } + String value = row.get(paramName); + if (StringUtils.isNotEmpty(value)) { + property.setValue(value); + info.setValue(value); + } + } + } + taskParams.put(LOCAL_PARAMS, allParam); + taskNode.setParams(JSONUtils.toJsonString(taskParams)); + // task instance node json + taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + String params4ProcessString = JSONUtils.toJsonString(params4Process); + int updateCount = this.processInstanceMapper.updateGlobalParamById(params4ProcessString, processInstance.getId()); + logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId()); + } + + public List> getListMapByString(String json) { + List> allParams = new ArrayList<>(); + ArrayNode paramsByJson = JSONUtils.parseArray(json); + Iterator listIterator = paramsByJson.iterator(); + while (listIterator.hasNext()) { + Map param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class); + allParams.add(param); + } + return allParams; + } + /** * convert integer list to string list * @@ -1642,7 +1702,7 @@ public class ProcessService { * update process instance state by id * * @param processInstanceId processInstanceId - * @param executionStatus executionStatus + * @param executionStatus executionStatus * @return update process result */ public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) { @@ -1679,7 +1739,7 @@ public class ProcessService { /** * find tenant code by resource name * - * @param resName resource name + * @param resName resource name * @param resourceType resource type * @return tenant code */ @@ -1703,9 +1763,9 @@ public class ProcessService { /** * get dependency cycle by work process define id and scheduler fire time * - * @param masterId masterId + * @param masterId masterId * @param processDefinitionId processDefinitionId - * @param scheduledFireTime the time the task schedule is expected to trigger + * @param scheduledFireTime the time the task schedule is expected to trigger * @return CycleDependency * @throws Exception if error throws Exception */ @@ -1718,8 +1778,8 @@ public class ProcessService { /** * get dependency cycle list by work process define id list and scheduler fire time * - * @param masterId masterId - * @param ids ids + * @param masterId masterId + * @param ids ids * @param scheduledFireTime the time the task schedule is expected to trigger * @return CycleDependency list * @throws Exception if error throws Exception @@ -1814,8 +1874,8 @@ public class ProcessService { * find last running process instance * * @param definitionId process definition id - * @param startTime start time - * @param endTime end time + * @param startTime start time + * @param endTime end time * @return process instance */ public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) { @@ -1915,7 +1975,7 @@ public class ProcessService { /** * list unauthorized udf function * - * @param userId user id + * @param userId user id * @param needChecks data source id array * @return unauthorized udf function list */ diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 551c9bb09c..7eec3669b4 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -442,4 +442,26 @@ public class ProcessServiceTest { Assert.assertEquals(expect, processService.changeJson(newProcessData,oldJson)); } + + @Test + public void testChangeOutParam() { + String result = "[{\"d\":\"20210203\"}]"; + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setProcessInstanceId(62); + taskInstance.setTaskJson("{\"id\":\"tasks-86175\",\"name\":\"wew\",\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0," + + "\"retryInterval\":1,\"params\":{\"rawScript\":\"echo 20210203\",\"localParams\":[{\"prop\":\"d\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"\"}]," + + "\"resourceList\":[]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]}," + + "\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"workerGroupId\":null," + + "\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}"); + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(62); + processInstance.setGlobalParams("[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"}," + + "{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]"); + String params4ProcessString = "[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"}," + + "{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20210203\"}]"; + Mockito.when(processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId())).thenReturn(processInstance); + Mockito.when(this.processInstanceMapper.updateGlobalParamById(params4ProcessString, processInstance.getId())).thenReturn(1); + processService.changeOutParam(result,taskInstance); + } + }