diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 1d7d84937f..1d3413b8c7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -17,14 +17,10 @@ package org.apache.dolphinscheduler.server.master.runner; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; - +import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; @@ -34,7 +30,6 @@ import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -58,6 +53,7 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; @@ -71,9 +67,8 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -89,11 +84,15 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; /** * master exec thread,split dag @@ -448,7 +447,6 @@ public class WorkflowExecuteThread { if (taskInstance.getState().typeIsSuccess()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); - processInstance.setVarPool(taskInstance.getVarPool()); processService.saveProcessInstance(processInstance); if (!processInstance.isBlocked()) { submitPostNode(Long.toString(taskInstance.getTaskCode())); @@ -1280,6 +1278,24 @@ public class WorkflowExecuteThread { TaskInstance task = createTaskInstance(processInstance, taskNodeObject); taskInstances.add(task); } + //the end node of the branch of the dag + if (StringUtils.isNotEmpty(parentNodeCode) && dag.getEndNode().contains(parentNodeCode)){ + TaskInstance endTaskInstance = taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode))); + String taskInstanceVarPool = endTaskInstance.getVarPool(); + if(StringUtils.isNotEmpty(taskInstanceVarPool)) { + Set taskProperties = JSONUtils.toList(taskInstanceVarPool, Property.class) + .stream().collect(Collectors.toSet()); + String processInstanceVarPool = processInstance.getVarPool(); + if (StringUtils.isNotEmpty(processInstanceVarPool)) { + Set properties = JSONUtils.toList(processInstanceVarPool, Property.class) + .stream().collect(Collectors.toSet()); + properties.addAll(taskProperties); + processInstance.setVarPool(JSONUtils.toJsonString(properties)); + }else{ + processInstance.setVarPool(JSONUtils.toJsonString(taskProperties)); + } + } + } // if previous node success , post node submit for (TaskInstance task : taskInstances) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index bdd00bf7e4..faf632028f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -17,21 +17,29 @@ package org.apache.dolphinscheduler.server.master.runner.task; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; - -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.auto.service.AutoService; +import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; +import java.util.List; +import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; -import com.google.auto.service.AutoService; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; /** * subtask processor @@ -111,10 +119,40 @@ public class SubTaskProcessor extends BaseTaskProcessor { if (subProcessInstance != null && subProcessInstance.getState().typeIsFinished()) { taskInstance.setState(subProcessInstance.getState()); taskInstance.setEndTime(new Date()); + dealFinish(); processService.saveTaskInstance(taskInstance); } } + /** + * get the params from subProcessInstance to this subProcessTask + */ + private void dealFinish() { + String thisTaskInstanceVarPool = taskInstance.getVarPool(); + if (StringUtils.isNotEmpty(thisTaskInstanceVarPool)) { + String subProcessInstanceVarPool = subProcessInstance.getVarPool(); + if (StringUtils.isNotEmpty(subProcessInstanceVarPool)) { + List varPoolProperties = JSONUtils.toList(thisTaskInstanceVarPool, Property.class); + Map taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference>() { + }); + Object localParams = taskParams.get(LOCAL_PARAMS); + if (localParams != null) { + List properties = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); + Map subProcessParam = JSONUtils.toList(subProcessInstanceVarPool, Property.class).stream() + .collect(Collectors.toMap(Property::getProp, Property::getValue)); + List outProperties = properties.stream().filter(r -> Direct.OUT == r.getDirect()).collect(Collectors.toList()); + for (Property info : outProperties) { + info.setValue(subProcessParam.get(info.getProp())); + varPoolProperties.add(info); + } + taskInstance.setVarPool(JSONUtils.toJsonString(varPoolProperties)); + //deal with localParam for show in the page + processService.changeOutParam(taskInstance); + } + } + } + } + @Override protected boolean pauseTask() { pauseSubWorkFlow(); @@ -180,4 +218,5 @@ public class SubTaskProcessor extends BaseTaskProcessor { public String getType() { return TASK_TYPE_SUB_PROCESS; } + } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java index 1a71d85428..200dec0b3d 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java @@ -17,19 +17,24 @@ package org.apache.dolphinscheduler.server.master; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.task.SubTaskProcessor; +import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; - +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,8 +44,11 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.context.ApplicationContext; +import java.util.ArrayList; +import java.util.List; + @RunWith(PowerMockRunner.class) -@PrepareForTest({ Stopper.class }) +@PrepareForTest({Stopper.class}) public class SubProcessTaskTest { /** @@ -76,17 +84,17 @@ public class SubProcessTaskTest { TaskInstance taskInstance = getTaskInstance(); Mockito.when(processService - .findProcessInstanceById(processInstance.getId())) + .findProcessInstanceById(processInstance.getId())) .thenReturn(processInstance); // for SubProcessTaskExecThread.setTaskInstanceState Mockito.when(processService - .updateTaskInstance(Mockito.any())) + .updateTaskInstance(Mockito.any())) .thenReturn(true); // for MasterBaseTaskExecThread.submit Mockito.when(processService - .submitTask(processInstance, taskInstance)) + .submitTask(processInstance, taskInstance)) .thenAnswer(t -> t.getArgument(0)); TaskDefinition taskDefinition = new TaskDefinition(); @@ -101,12 +109,13 @@ public class SubProcessTaskTest { TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance); ProcessInstance subProcessInstance = getSubProcessInstance(expectResult); + subProcessInstance.setVarPool(getProperty()); // for SubProcessTaskExecThread.waitTaskQuit Mockito.when(processService - .findProcessInstanceById(subProcessInstance.getId())) + .findProcessInstanceById(subProcessInstance.getId())) .thenReturn(subProcessInstance); Mockito.when(processService - .findSubProcessInstance(processInstance.getId(), taskInstance.getId())) + .findSubProcessInstance(processInstance.getId(), taskInstance.getId())) .thenReturn(subProcessInstance); return taskInstance; @@ -120,6 +129,34 @@ public class SubProcessTaskTest { //Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState()); } + @Test + public void testFinish() { + TaskInstance taskInstance = testBasicInit(ExecutionStatus.SUCCESS); + taskInstance.setVarPool(getProperty()); + taskInstance.setTaskParams("{\"processDefinitionCode\":110," + + "\"dependence\":{},\"localParams\":[{\"prop\":\"key\"," + + "\"direct\":\"out\",\"type\":\"VARCHAR\",\"value\":\"\"}," + + "{\"prop\":\"database_name\",\"direct\":\"OUT\"," + + "\"type\":\"VARCHAR\",\"value\":\"\"}]," + + "\"conditionResult\":{\"successNode\":[],\"failedNode\":[]}," + + "\"waitStartTimeout\":{},\"switchResult\":{}}"); + SubTaskProcessor subTaskProcessor = new SubTaskProcessor(); + subTaskProcessor.init(taskInstance, processInstance); + subTaskProcessor.action(TaskAction.RUN); + ExecutionStatus status = taskInstance.getState(); + Assert.assertEquals(ExecutionStatus.SUCCESS, status); + } + + private String getProperty() { + List varPools = new ArrayList<>(); + Property property = new Property(); + property.setProp("key"); + property.setValue("1"); + property.setDirect(Direct.OUT); + varPools.add(property); + return JSONUtils.toJsonString(varPools); + } + @Test public void testBasicFailure() { TaskInstance taskInstance = testBasicInit(ExecutionStatus.FAILURE); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 8a9427f5f8..bf377d8358 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -17,19 +17,13 @@ package org.apache.dolphinscheduler.service.process; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; -import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; - -import static java.util.stream.Collectors.toSet; - +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -130,9 +124,11 @@ import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.spi.enums.ResourceType; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.Arrays; @@ -147,16 +143,17 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; - -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; +import static java.util.stream.Collectors.toSet; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; /** * process relative dao that some mappers in this. @@ -269,8 +266,8 @@ public class ProcessService { /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * - * @param logger logger - * @param host host + * @param logger logger + * @param host host * @param command found command * @return process instance */ @@ -371,7 +368,7 @@ public class ProcessService { /** * set process waiting thread * - * @param command command + * @param command command * @param processInstance processInstance * @return process instance */ @@ -584,6 +581,7 @@ public class ProcessService { /** * recursive delete all task instance by process instance id + * * @param processInstanceId */ public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) { @@ -605,7 +603,7 @@ public class ProcessService { * recursive query sub process definition id by parent id. * * @param parentCode parentCode - * @param ids ids + * @param ids ids */ public void recurseFindSubProcess(long parentCode, List ids) { List taskNodeList = this.getTaskNodeListByDefinition(parentCode); @@ -630,7 +628,7 @@ public class ProcessService { * create recovery waiting thread command and delete origin command at the same time. * if the recovery command is exists, only update the field update_time * - * @param originCommand originCommand + * @param originCommand originCommand * @param processInstance processInstance */ public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) { @@ -686,7 +684,7 @@ public class ProcessService { /** * get schedule time from command * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return date */ @@ -715,8 +713,8 @@ public class ProcessService { * generate a new work process instance from command. * * @param processDefinition processDefinition - * @param command command - * @param cmdParam cmdParam map + * @param command command + * @param cmdParam cmdParam map * @return process instance */ private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition, @@ -801,7 +799,7 @@ public class ProcessService { * use definition creator's tenant. * * @param tenantId tenantId - * @param userId userId + * @param userId userId * @return tenant */ public Tenant getTenantForProcess(int tenantId, int userId) { @@ -839,7 +837,7 @@ public class ProcessService { /** * check command parameters is valid * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return whether command param is valid */ @@ -859,7 +857,7 @@ public class ProcessService { * construct process instance according to one command. * * @param command command - * @param host host + * @param host host * @return process instance */ protected ProcessInstance constructProcessInstance(Command command, String host) { @@ -1038,7 +1036,7 @@ public class ProcessService { * return complement data if the process start with complement data * * @param processInstance processInstance - * @param command command + * @param command command * @return command type */ private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) { @@ -1053,8 +1051,8 @@ public class ProcessService { * initialize complement data parameters * * @param processDefinition processDefinition - * @param processInstance processInstance - * @param cmdParam cmdParam + * @param processInstance processInstance + * @param cmdParam cmdParam */ private void initComplementDataParam(ProcessDefinition processDefinition, ProcessInstance processInstance, @@ -1127,7 +1125,7 @@ public class ProcessService { * only the keys doesn't in sub process global would be joined. * * @param parentGlobalParams parentGlobalParams - * @param subGlobalParams subGlobalParams + * @param subGlobalParams subGlobalParams * @return global params join */ private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) { @@ -1194,7 +1192,7 @@ public class ProcessService { * submit sub process to command * * @param processInstance processInstance - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @return task instance */ @Transactional(rollbackFor = Exception.class) @@ -1225,7 +1223,7 @@ public class ProcessService { * set map {parent instance id, task instance id, 0(child instance id)} * * @param parentInstance parentInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { @@ -1254,7 +1252,7 @@ public class ProcessService { * find previous task work process map. * * @param parentProcessInstance parentProcessInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance, @@ -1280,7 +1278,7 @@ public class ProcessService { * create sub work process command * * @param parentProcessInstance parentProcessInstance - * @param task task + * @param task task */ public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) { if (!task.isSubProcess()) { @@ -1345,19 +1343,22 @@ public class ProcessService { ProcessInstanceMap instanceMap, TaskInstance task) { CommandType commandType = getSubCommandType(parentProcessInstance, childInstance); - Map subProcessParam = JSONUtils.toMap(task.getTaskParams()); + Map subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class); long childDefineCode = 0L; if (subProcessParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)) { - childDefineCode = Long.parseLong(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)); + childDefineCode = NumberUtils.toLong(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE))); } ProcessDefinition subProcessDefinition = processDefineMapper.queryByCode(childDefineCode); Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS); List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); - Map globalMap = this.getGlobalParamMap(parentProcessInstance.getGlobalParams()); + Map globalMap = this.getGlobalParamMap(task.getVarPool()); Map fatherParams = new HashMap<>(); if (CollectionUtils.isNotEmpty(allParam)) { for (Property info : allParam) { + if (Direct.OUT == info.getDirect()) { + continue; + } fatherParams.put(info.getProp(), globalMap.get(info.getProp())); } } @@ -1411,7 +1412,7 @@ public class ProcessService { * update sub process definition * * @param parentProcessInstance parentProcessInstance - * @param childDefinitionCode childDefinitionId + * @param childDefinitionCode childDefinitionId */ private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) { ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(), @@ -1426,7 +1427,7 @@ public class ProcessService { /** * submit task to mysql * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstance processInstance * @return task instance */ @@ -1462,7 +1463,7 @@ public class ProcessService { * return stop if work process state is ready stop * if all of above are not satisfied, return submit success * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstance processInstance * @return process instance state */ @@ -1688,7 +1689,7 @@ public class ProcessService { * get id list by task state * * @param instanceId instanceId - * @param state state + * @param state state * @return task instance states */ public List findTaskIdByInstanceState(int instanceId, ExecutionStatus state) { @@ -1743,7 +1744,7 @@ public class ProcessService { * find work process map by parent process id and parent task id. * * @param parentWorkProcessId parentWorkProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance map */ public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { @@ -1765,7 +1766,7 @@ public class ProcessService { * find sub process instance * * @param parentProcessId parentProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance */ public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) { @@ -1797,11 +1798,11 @@ public class ProcessService { /** * change task state * - * @param state state - * @param startTime startTime - * @param host host + * @param state state + * @param startTime startTime + * @param host host * @param executePath executePath - * @param logPath logPath + * @param logPath logPath */ public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, @@ -1830,7 +1831,7 @@ public class ProcessService { /** * change task state * - * @param state state + * @param state state * @param endTime endTime * @param varPool varPool */ @@ -2005,7 +2006,7 @@ public class ProcessService { * update process instance state by id * * @param processInstanceId processInstanceId - * @param executionStatus executionStatus + * @param executionStatus executionStatus * @return update process result */ public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) { @@ -2041,7 +2042,7 @@ public class ProcessService { /** * find tenant code by resource name * - * @param resName resource name + * @param resName resource name * @param resourceType resource type * @return tenant code */ @@ -2079,7 +2080,7 @@ public class ProcessService { * find last scheduler process instance in the date interval * * @param definitionCode definitionCode - * @param dateInterval dateInterval + * @param dateInterval dateInterval * @return process instance */ public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) { @@ -2092,7 +2093,7 @@ public class ProcessService { * find last manual process instance interval * * @param definitionCode process definition code - * @param dateInterval dateInterval + * @param dateInterval dateInterval * @return process instance */ public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) { @@ -2105,8 +2106,8 @@ public class ProcessService { * find last running process instance * * @param definitionCode process definition code - * @param startTime start time - * @param endTime end time + * @param startTime start time + * @param endTime end time * @return process instance */ public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) { @@ -2190,7 +2191,7 @@ public class ProcessService { /** * list unauthorized udf function * - * @param userId user id + * @param userId user id * @param needChecks data source id array * @return unauthorized udf function list */ @@ -2573,7 +2574,7 @@ public class ProcessService { taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()); } } - taskCodeVersionMap.forEach((code,version) -> { + taskCodeVersionMap.forEach((code, version) -> { taskDefinitionLogs.add((TaskDefinitionLog) this.findTaskDefinition(code, version)); }); return taskDefinitionLogs; @@ -2598,7 +2599,7 @@ public class ProcessService { * add authorized resources * * @param ownResources own resources - * @param userId userId + * @param userId userId */ private void addAuthorizedResources(List ownResources, int userId) { List relationResourceIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7); @@ -2685,7 +2686,7 @@ public class ProcessService { public int updateDqExecuteResultUserId(int taskInstanceId) { DqExecuteResult dqExecuteResult = - dqExecuteResultMapper.selectOne(new QueryWrapper().eq(TASK_INSTANCE_ID,taskInstanceId)); + dqExecuteResultMapper.selectOne(new QueryWrapper().eq(TASK_INSTANCE_ID, taskInstanceId)); if (dqExecuteResult == null) { return -1; } @@ -2713,13 +2714,13 @@ public class ProcessService { public int deleteDqExecuteResultByTaskInstanceId(int taskInstanceId) { return dqExecuteResultMapper.delete( new QueryWrapper() - .eq(TASK_INSTANCE_ID,taskInstanceId)); + .eq(TASK_INSTANCE_ID, taskInstanceId)); } public int deleteTaskStatisticsValueByTaskInstanceId(int taskInstanceId) { return dqTaskStatisticsValueMapper.delete( new QueryWrapper() - .eq(TASK_INSTANCE_ID,taskInstanceId)); + .eq(TASK_INSTANCE_ID, taskInstanceId)); } public DqRule getDqRule(int ruleId) { @@ -2740,6 +2741,7 @@ public class ProcessService { /** * the first time (when submit the task ) get the resource of the task group + * * @param taskId task id * @param taskName * @param groupId @@ -2785,13 +2787,14 @@ public class ProcessService { } /** - * try to get the task group resource(when other task release the resource) + * try to get the task group resource(when other task release the resource) + * * @param taskGroupQueue * @return */ public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) { TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); - int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),taskGroupQueue.getId(), + int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), taskGroupQueue.getId(), TaskGroupQueueStatus.WAIT_QUEUE.getCode()); if (affectedCount > 0) { taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); @@ -2838,9 +2841,9 @@ public class ProcessService { taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); } } catch (Exception e) { - logger.error("release the task group error",e); + logger.error("release the task group error", e); } - logger.info("updateTask:{}",taskInstance.getName()); + logger.info("updateTask:{}", taskInstance.getName()); changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE); TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); @@ -2903,7 +2906,7 @@ public class ProcessService { return this.taskGroupQueueMapper.queryByTaskId(taskId); } - public void sendStartTask2Master(ProcessInstance processInstance,int taskId, + public void sendStartTask2Master(ProcessInstance processInstance, int taskId, org.apache.dolphinscheduler.remote.command.CommandType taskType) { String host = processInstance.getHost(); String address = host.split(":")[0];