Browse Source

[Feature-#3805][server-master] global params of master (#4678)

* global initParam and set Param

* fix dataFormat error

* fix deal outParams bug

* fix code style

* fix code style

* fix code style

* fix code style

* fix code style

* fix code style

* fix code style

* add test

* fix code style (variable name)

* fix reset globalParams bug

Co-authored-by: wangxj <wangxj31>
pull/3/MERGE
wangxj3 3 years ago committed by GitHub
parent
commit
87ff13fc86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  2. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  3. 13
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
  4. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  5. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
  6. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
  7. 51
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  8. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
  9. 176
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  10. 22
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -221,4 +221,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("processDefinitionId") int processDefinitionId,
@Param("states") int[] states);
int updateGlobalParamById(
@Param("globalParams") String globalParams,
@Param("id") int id);
}

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -219,5 +219,9 @@
</foreach>
order by id asc
</select>
<update id="updateGlobalParamById">
update t_ds_process_instance
set global_params = #{globalParams}
where id = #{id}
</update>
</mapper>

13
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java

@ -68,7 +68,10 @@ public class TaskExecuteResponseCommand implements Serializable {
* varPool string
*/
private String varPool;
/**
* task return result
*/
private String result;
public void setVarPool(String varPool) {
this.varPool = varPool;
}
@ -139,4 +142,12 @@ public class TaskExecuteResponseCommand implements Serializable {
+ ", appIds='" + appIds + '\''
+ '}';
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -80,7 +80,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
responseCommand.getAppIds(),
responseCommand.getTaskInstanceId(),
responseCommand.getVarPool(),
channel);
channel,
responseCommand.getResult()
);
taskResponseService.addResponse(taskResponseEvent);
}

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java

@ -92,6 +92,10 @@ public class TaskResponseEvent {
* channel
*/
private Channel channel;
/**
* task return result
*/
private String result;
public static TaskResponseEvent newAck(ExecutionStatus state,
Date startTime,
@ -118,7 +122,8 @@ public class TaskResponseEvent {
String appIds,
int taskInstanceId,
String varPool,
Channel channel) {
Channel channel,
String result) {
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setEndTime(endTime);
@ -128,6 +133,7 @@ public class TaskResponseEvent {
event.setEvent(Event.RESULT);
event.setVarPool(varPool);
event.setChannel(channel);
event.setResult(result);
return event;
}
@ -226,4 +232,12 @@ public class TaskResponseEvent {
public void setChannel(Channel channel) {
this.channel = channel;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@ -165,7 +165,8 @@ public class TaskResponseService {
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool()
taskResponseEvent.getVarPool(),
taskResponseEvent.getResult()
);
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success

51
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -22,11 +22,13 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@ -36,6 +38,7 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@ -67,6 +70,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -74,6 +78,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -491,7 +496,8 @@ public class MasterExecThread implements Runnable {
*/
private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName,
TaskNode taskNode) {
//update processInstance for update the globalParams
this.processInstance = this.processService.findProcessInstanceById(this.processInstance.getId());
TaskInstance taskInstance = findTaskIfExists(nodeName);
if (taskInstance == null) {
taskInstance = new TaskInstance();
@ -540,13 +546,53 @@ public class MasterExecThread implements Runnable {
} else {
taskInstance.setWorkerGroup(taskWorkerGroup);
}
//get process global
setProcessGlobal(taskNode, taskInstance);
// delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime());
}
return taskInstance;
}
private void setProcessGlobal(TaskNode taskNode, TaskInstance taskInstance) {
String globalParams = this.processInstance.getGlobalParams();
if (StringUtils.isNotEmpty(globalParams)) {
Map<String, String> globalMap = getGlobalParamMap(globalParams);
if (globalMap != null) {
// the param save in localParams
Map<String, Object> result = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
Object localParams = result.get(LOCAL_PARAMS);
if (localParams != null) {
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
for (Property info : allParam) {
if (info.getDirect().equals(Direct.IN)) {
String paramName = info.getProp();
String value = globalMap.get(paramName);
if (StringUtils.isNotEmpty(value)) {
info.setValue(value);
}
}
}
result.put(LOCAL_PARAMS, allParam);
taskNode.setParams(JSONUtils.toJsonString(result));
// task instance node json
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
}
}
}
}
public Map<String, String> getGlobalParamMap(String globalParams) {
List<Property> propList;
Map<String,String> globalParamMap = new HashMap<>();
if (StringUtils.isNotEmpty(globalParams)) {
propList = JSONUtils.toList(globalParams, Property.class);
globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
}
return globalParamMap;
}
private void submitPostNode(String parentNodeName) {
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>();
@ -952,6 +998,7 @@ public class MasterExecThread implements Runnable {
// node success , post node submit
if (task.getState() == ExecutionStatus.SUCCESS) {
processInstance.setVarPool(task.getVarPool());
processInstance = processService.findProcessInstanceById(processInstance.getId());
processService.updateProcessInstance(processInstance);
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@ -70,7 +70,8 @@ public class TaskResponseServiceTest {
"ids",
22,
"varPol",
channel);
channel,
"[{\"id\":70000,\"database_name\":\"yuul\",\"status\":-1,\"create_time\":1601202829000,\"update_time\":1601202829000,\"table_name3\":\"\",\"table_name4\":\"\"}]");
taskInstance = new TaskInstance();
taskInstance.setId(22);

176
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -24,6 +24,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PRO
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static java.util.stream.Collectors.toSet;
@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@ -89,6 +91,7 @@ import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -103,6 +106,8 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.cronutils.model.Cron;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
@ -161,10 +166,10 @@ public class ProcessService {
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
* @param logger logger
* @param host host
* @param logger logger
* @param host host
* @param validThreadNum validThreadNum
* @param command found command
* @param command found command
* @return process instance
*/
@Transactional(rollbackFor = Exception.class)
@ -204,7 +209,7 @@ public class ProcessService {
/**
* set process waiting thread
*
* @param command command
* @param command command
* @param processInstance processInstance
* @return process instance
*/
@ -222,7 +227,7 @@ public class ProcessService {
/**
* check thread num
*
* @param command command
* @param command command
* @param validThreadNum validThreadNum
* @return if thread is enough
*/
@ -425,7 +430,7 @@ public class ProcessService {
* recursive query sub process definition id by parent id.
*
* @param parentId parentId
* @param ids ids
* @param ids ids
*/
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
ProcessDefinition processDefinition = processDefineMapper.selectById(parentId);
@ -456,7 +461,7 @@ public class ProcessService {
* create recovery waiting thread command and delete origin command at the same time.
* if the recovery command is exists, only update the field update_time
*
* @param originCommand originCommand
* @param originCommand originCommand
* @param processInstance processInstance
*/
public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
@ -508,7 +513,7 @@ public class ProcessService {
/**
* get schedule time from command
*
* @param command command
* @param command command
* @param cmdParam cmdParam map
* @return date
*/
@ -524,8 +529,8 @@ public class ProcessService {
* generate a new work process instance from command.
*
* @param processDefinition processDefinition
* @param command command
* @param cmdParam cmdParam map
* @param command command
* @param cmdParam cmdParam map
* @return process instance
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
@ -601,7 +606,7 @@ public class ProcessService {
* use definition creator's tenant.
*
* @param tenantId tenantId
* @param userId userId
* @param userId userId
* @return tenant
*/
public Tenant getTenantForProcess(int tenantId, int userId) {
@ -624,7 +629,7 @@ public class ProcessService {
/**
* check command parameters is valid
*
* @param command command
* @param command command
* @param cmdParam cmdParam map
* @return whether command param is valid
*/
@ -644,7 +649,7 @@ public class ProcessService {
* construct process instance according to one command.
*
* @param command command
* @param host host
* @param host host
* @return process instance
*/
private ProcessInstance constructProcessInstance(Command command, String host) {
@ -686,11 +691,6 @@ public class ProcessService {
} else {
processInstance = this.findProcessInstanceDetailById(processInstanceId);
// Recalculate global parameters after rerun.
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
getCommandTypeIfComplement(processInstance, command),
processInstance.getScheduleTime()));
}
processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId());
processInstance.setProcessDefinition(processDefinition);
@ -807,7 +807,7 @@ public class ProcessService {
* return complement data if the process start with complement data
*
* @param processInstance processInstance
* @param command command
* @param command command
* @return command type
*/
private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) {
@ -822,8 +822,8 @@ public class ProcessService {
* initialize complement data parameters
*
* @param processDefinition processDefinition
* @param processInstance processInstance
* @param cmdParam cmdParam
* @param processInstance processInstance
* @param cmdParam cmdParam
*/
private void initComplementDataParam(ProcessDefinition processDefinition,
ProcessInstance processInstance,
@ -895,7 +895,7 @@ public class ProcessService {
* only the keys doesn't in sub process global would be joined.
*
* @param parentGlobalParams parentGlobalParams
* @param subGlobalParams subGlobalParams
* @param subGlobalParams subGlobalParams
* @return global params join
*/
private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) {
@ -965,7 +965,7 @@ public class ProcessService {
* set map {parent instance id, task instance id, 0(child instance id)}
*
* @param parentInstance parentInstance
* @param parentTask parentTask
* @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
@ -994,7 +994,7 @@ public class ProcessService {
* find previous task work process map.
*
* @param parentProcessInstance parentProcessInstance
* @param parentTask parentTask
* @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
@ -1020,7 +1020,7 @@ public class ProcessService {
* create sub work process command
*
* @param parentProcessInstance parentProcessInstance
* @param task task
* @param task task
*/
public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
if (!task.isSubProcess()) {
@ -1119,7 +1119,7 @@ public class ProcessService {
* update sub process definition
*
* @param parentProcessInstance parentProcessInstance
* @param childDefinitionId childDefinitionId
* @param childDefinitionId childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) {
ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
@ -1133,7 +1133,7 @@ public class ProcessService {
/**
* submit task to mysql
*
* @param taskInstance taskInstance
* @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
*/
@ -1187,16 +1187,16 @@ public class ProcessService {
* return stop if work process state is ready stop
* if all of above are not satisfied, return submit success
*
* @param taskInstance taskInstance
* @param taskInstance taskInstance
* @param processInstanceState processInstanceState
* @return process instance state
*/
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) {
ExecutionStatus state = taskInstance.getState();
// running, delayed or killed
// the task already exists in task queue
// return state
if (
// running, delayed or killed
// the task already exists in task queue
// return state
state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
@ -1363,7 +1363,7 @@ public class ProcessService {
* get id list by task state
*
* @param instanceId instanceId
* @param state state
* @param state state
* @return task instance states
*/
public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state) {
@ -1418,7 +1418,7 @@ public class ProcessService {
* find work process map by parent process id and parent task id.
*
* @param parentWorkProcessId parentWorkProcessId
* @param parentTaskId parentTaskId
* @param parentTaskId parentTaskId
* @return process instance map
*/
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
@ -1440,7 +1440,7 @@ public class ProcessService {
* find sub process instance
*
* @param parentProcessId parentProcessId
* @param parentTaskId parentTaskId
* @param parentTaskId parentTaskId
* @return process instance
*/
public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) {
@ -1472,12 +1472,12 @@ public class ProcessService {
/**
* change task state
*
* @param state state
* @param startTime startTime
* @param host host
* @param state state
* @param startTime startTime
* @param host host
* @param executePath executePath
* @param logPath logPath
* @param taskInstId taskInstId
* @param logPath logPath
* @param taskInstId taskInstId
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host,
String executePath,
@ -1505,12 +1505,12 @@ public class ProcessService {
* update the process instance
*
* @param processInstanceId processInstanceId
* @param processJson processJson
* @param globalParams globalParams
* @param scheduleTime scheduleTime
* @param flag flag
* @param locations locations
* @param connects connects
* @param processJson processJson
* @param globalParams globalParams
* @param scheduleTime scheduleTime
* @param flag flag
* @param locations locations
* @param connects connects
* @return update process instance result
*/
public int updateProcessInstance(Integer processInstanceId, String processJson,
@ -1531,25 +1531,85 @@ public class ProcessService {
/**
* change task state
*
* @param state state
* @param endTime endTime
* @param state state
* @param endTime endTime
* @param taskInstId taskInstId
* @param varPool varPool
* @param varPool varPool
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
Date endTime,
int processId,
String appIds,
int taskInstId,
String varPool) {
String varPool,
String result) {
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
taskInstance.setState(state);
taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool);
changeOutParam(result, taskInstance);
saveTaskInstance(taskInstance);
}
public void changeOutParam(String result, TaskInstance taskInstance) {
if (StringUtils.isEmpty(result)) {
return;
}
List<Map<String, String>> workerResultParam = getListMapByString(result);
if (CollectionUtils.isEmpty(workerResultParam)) {
return;
}
//if the result more than one line,just get the first .
Map<String, String> row = workerResultParam.get(0);
if (row == null || row.size() == 0) {
return;
}
TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class);
Map<String, Object> taskParams = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
Object localParams = taskParams.get(LOCAL_PARAMS);
if (localParams == null) {
return;
}
ProcessInstance processInstance = this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId());
List<Property> params4Process = JSONUtils.toList(processInstance.getGlobalParams(), Property.class);
Map<String, Property> allParamMap = params4Process.stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
for (Property info : allParam) {
if (info.getDirect() == Direct.OUT) {
String paramName = info.getProp();
Property property = allParamMap.get(paramName);
if (property == null) {
continue;
}
String value = row.get(paramName);
if (StringUtils.isNotEmpty(value)) {
property.setValue(value);
info.setValue(value);
}
}
}
taskParams.put(LOCAL_PARAMS, allParam);
taskNode.setParams(JSONUtils.toJsonString(taskParams));
// task instance node json
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
String params4ProcessString = JSONUtils.toJsonString(params4Process);
int updateCount = this.processInstanceMapper.updateGlobalParamById(params4ProcessString, processInstance.getId());
logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId());
}
public List<Map<String, String>> getListMapByString(String json) {
List<Map<String, String>> allParams = new ArrayList<>();
ArrayNode paramsByJson = JSONUtils.parseArray(json);
Iterator<JsonNode> listIterator = paramsByJson.iterator();
while (listIterator.hasNext()) {
Map<String, String> param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
allParams.add(param);
}
return allParams;
}
/**
* convert integer list to string list
*
@ -1642,7 +1702,7 @@ public class ProcessService {
* update process instance state by id
*
* @param processInstanceId processInstanceId
* @param executionStatus executionStatus
* @param executionStatus executionStatus
* @return update process result
*/
public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
@ -1679,7 +1739,7 @@ public class ProcessService {
/**
* find tenant code by resource name
*
* @param resName resource name
* @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
@ -1703,9 +1763,9 @@ public class ProcessService {
/**
* get dependency cycle by work process define id and scheduler fire time
*
* @param masterId masterId
* @param masterId masterId
* @param processDefinitionId processDefinitionId
* @param scheduledFireTime the time the task schedule is expected to trigger
* @param scheduledFireTime the time the task schedule is expected to trigger
* @return CycleDependency
* @throws Exception if error throws Exception
*/
@ -1718,8 +1778,8 @@ public class ProcessService {
/**
* get dependency cycle list by work process define id list and scheduler fire time
*
* @param masterId masterId
* @param ids ids
* @param masterId masterId
* @param ids ids
* @param scheduledFireTime the time the task schedule is expected to trigger
* @return CycleDependency list
* @throws Exception if error throws Exception
@ -1814,8 +1874,8 @@ public class ProcessService {
* find last running process instance
*
* @param definitionId process definition id
* @param startTime start time
* @param endTime end time
* @param startTime start time
* @param endTime end time
* @return process instance
*/
public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) {
@ -1915,7 +1975,7 @@ public class ProcessService {
/**
* list unauthorized udf function
*
* @param userId user id
* @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/

22
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -442,4 +442,26 @@ public class ProcessServiceTest {
Assert.assertEquals(expect, processService.changeJson(newProcessData,oldJson));
}
@Test
public void testChangeOutParam() {
String result = "[{\"d\":\"20210203\"}]";
TaskInstance taskInstance = new TaskInstance();
taskInstance.setProcessInstanceId(62);
taskInstance.setTaskJson("{\"id\":\"tasks-86175\",\"name\":\"wew\",\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,"
+ "\"retryInterval\":1,\"params\":{\"rawScript\":\"echo 20210203\",\"localParams\":[{\"prop\":\"d\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"\"}],"
+ "\"resourceList\":[]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"workerGroupId\":null,"
+ "\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}");
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(62);
processInstance.setGlobalParams("[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"},"
+ "{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
String params4ProcessString = "[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"},"
+ "{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20210203\"}]";
Mockito.when(processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
Mockito.when(this.processInstanceMapper.updateGlobalParamById(params4ProcessString, processInstance.getId())).thenReturn(1);
processService.changeOutParam(result,taskInstance);
}
}

Loading…
Cancel
Save