Browse Source

[Feature][JsonSplit-api] refactor method of task save (#6067)

* refactor method of task save

* fix ut

* fix ut

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
ee4e64a9a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  2. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  4. 112
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  5. 46
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  6. 83
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  7. 20
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  8. 17
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  9. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
  10. 33
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
  11. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  12. 88
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -162,6 +162,7 @@ public class ProcessInstanceController extends BaseController {
* @param loginUser login user
* @param projectCode project code
* @param taskRelationJson process task relation json
* @param taskDefinitionJson taskDefinitionJson
* @param processInstanceId process instance id
* @param scheduleTime schedule time
* @param syncDefine sync define
@ -172,6 +173,7 @@ public class ProcessInstanceController extends BaseController {
@ApiOperation(value = "updateProcessInstance", notes = "UPDATE_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskRelationJson", value = "TASK_RELATION_JSON", type = "String"),
@ApiImplicitParam(name = "taskDefinitionJson", value = "TASK_DEFINITION_JSON", type = "String"),
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", type = "String"),
@ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required = true, type = "Boolean"),
@ -187,6 +189,7 @@ public class ProcessInstanceController extends BaseController {
public Result updateProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskRelationJson", required = true) String taskRelationJson,
@RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson,
@RequestParam(value = "processInstanceId") Integer processInstanceId,
@RequestParam(value = "scheduleTime", required = false) String scheduleTime,
@RequestParam(value = "syncDefine", required = true) Boolean syncDefine,
@ -195,7 +198,7 @@ public class ProcessInstanceController extends BaseController {
@RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
@RequestParam(value = "tenantCode", required = true) String tenantCode) {
Map<String, Object> result = processInstanceService.updateProcessInstance(loginUser, projectCode, processInstanceId,
taskRelationJson, scheduleTime, syncDefine, globalParams, locations, timeout, tenantCode);
taskRelationJson, taskDefinitionJson, scheduleTime, syncDefine, globalParams, locations, timeout, tenantCode);
return returnDataList(result);
}

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -276,6 +276,7 @@ public enum Status {
DELETE_TASK_DEFINE_BY_CODE_ERROR(50042, "delete task definition by code error", "删除任务定义错误"),
QUERY_DETAIL_OF_TASK_DEFINITION_ERROR(50043, "query detail of task definition error", "查询任务详细信息错误"),
QUERY_TASK_DEFINITION_LIST_PAGING_ERROR(50044, "query task definition list paging error", "分页查询任务定义列表错误"),
TASK_DEFINITION_NAME_EXISTED(50045, "task definition name [{0}] already exists", "任务定义名称[{0}]已经存在"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/**

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -116,6 +116,7 @@ public interface ProcessInstanceService {
* @param loginUser login user
* @param projectCode project code
* @param taskRelationJson process task relation json
* @param taskDefinitionJson taskDefinitionJson
* @param processInstanceId process instance id
* @param scheduleTime schedule time
* @param syncDefine sync define
@ -129,6 +130,7 @@ public interface ProcessInstanceService {
long projectCode,
Integer processInstanceId,
String taskRelationJson,
String taskDefinitionJson,
String scheduleTime,
Boolean syncDefine,
String globalParams,

112
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -61,7 +60,6 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
@ -120,9 +118,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private ProjectService projectService;
@Autowired
private TaskDefinitionService taskDefinitionService;
@Autowired
private UserMapper userMapper;
@ -147,21 +142,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private SchedulerService schedulerService;
@Autowired
private TenantMapper tenantMapper;
/**
* create process definition
*
@ -178,7 +158,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return create result code
*/
@Override
@Transactional(rollbackFor = Exception.class)
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> createProcessDefinition(User loginUser,
long projectCode,
String name,
@ -202,9 +182,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
return result;
}
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
createTaskDefinition(result, loginUser, projectCode, taskDefinitionLogs, taskDefinitionJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson);
Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
return checkRelationJson;
}
@ -215,8 +199,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
taskDefinitionService.createTaskDefinition(loginUser, projectCode, taskDefinitionJson);
long processDefinitionCode;
try {
processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
@ -227,16 +209,59 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, locations, timeout, loginUser.getId(), tenant.getId());
return createProcessDefine(loginUser, result, taskRelationList, processDefinition);
return createProcessDefine(loginUser, result, taskRelationList, processDefinition, taskDefinitionLogs);
}
@Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private SchedulerService schedulerService;
@Autowired
private TenantMapper tenantMapper;
private void createTaskDefinition(Map<String, Object> result,
User loginUser,
long projectCode,
List<TaskDefinitionLog> taskDefinitionLogs,
String taskDefinitionJson) {
if (taskDefinitionLogs.isEmpty()) {
logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson);
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return;
}
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
logger.error("task definition {} parameter invalid", taskDefinitionLog.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, taskDefinitionLog.getName());
if (taskDefinition != null) {
logger.error("task definition name {} already exists", taskDefinitionLog.getName());
putMsg(result, Status.TASK_DEFINITION_NAME_EXISTED, taskDefinitionLog.getName());
return;
}
}
if (processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs)) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
}
}
private Map<String, Object> createProcessDefine(User loginUser,
Map<String, Object> result,
List<ProcessTaskRelationLog> taskRelationList,
ProcessDefinition processDefinition) {
ProcessDefinition processDefinition,
List<TaskDefinitionLog> taskDefinitionLogs) {
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
if (insertVersion > 0) {
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
if (insertResult > 0) {
putMsg(result, Status.SUCCESS);
// return processDefinitionCode
@ -250,7 +275,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
private Map<String, Object> checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList, String taskRelationJson) {
private Map<String, Object> checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList, String taskRelationJson, List<TaskDefinitionLog> taskDefinitionLogs) {
Map<String, Object> result = new HashMap<>();
try {
if (taskRelationList == null || taskRelationList.isEmpty()) {
@ -259,7 +284,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
List<TaskNode> taskNodeList = processService.transformTask(taskRelationList);
List<TaskNode> taskNodeList = processService.transformTask(taskRelationList, taskDefinitionLogs);
if (taskNodeList.size() != taskRelationList.size()) {
Set<Long> postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
Set<Long> taskNodeCodes = taskNodeList.stream().map(TaskNode::getCode).collect(Collectors.toSet());
@ -276,7 +301,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// check whether the task relation json is normal
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
if (processTaskRelationLog.getPostTaskCode() == 0 || processTaskRelationLog.getPostTaskVersion() == 0) {
if (processTaskRelationLog.getPostTaskCode() == 0) {
logger.error("the post_task_code or post_task_version can't be zero");
putMsg(result, Status.CHECK_PROCESS_TASK_RELATION_ERROR);
return result;
@ -419,7 +444,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param taskDefinitionJson taskDefinitionJson
* @return update result code
*/
@Transactional(rollbackFor = Exception.class)
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> updateProcessDefinition(User loginUser,
long projectCode,
@ -439,8 +464,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
createTaskDefinition(result, loginUser, projectCode, taskDefinitionLogs, taskDefinitionJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson);
Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
return checkRelationJson;
}
@ -470,20 +500,20 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
}
taskDefinitionService.createTaskDefinition(loginUser, projectCode, taskDefinitionJson);
processDefinition.set(projectCode, name, description, globalParams, locations, timeout, tenant.getId());
return updateProcessDefine(loginUser, result, taskRelationList, processDefinition);
return updateProcessDefine(loginUser, result, taskRelationList, processDefinition, taskDefinitionLogs);
}
private Map<String, Object> updateProcessDefine(User loginUser,
Map<String, Object> result,
List<ProcessTaskRelationLog> taskRelationList,
ProcessDefinition processDefinition) {
ProcessDefinition processDefinition,
List<TaskDefinitionLog> taskDefinitionLogs) {
processDefinition.setUpdateTime(new Date());
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
if (insertVersion > 0) {
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList);
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
if (insertResult > 0) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@ -818,7 +848,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
});
Map<String, Object> createProcessResult = createProcessDefine(loginUser, result, taskRelationList, processDefinition);
Map<String, Object> createProcessResult = createProcessDefine(loginUser, result, taskRelationList, processDefinition, null);
if (Status.SUCCESS.equals(createProcessResult.get(Constants.STATUS))) {
putMsg(createProcessResult, Status.SUCCESS);
} else {
@ -894,7 +924,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(processTaskRelationJson, ProcessTaskRelationLog.class);
// Check whether the task node is normal
List<TaskNode> taskNodes = processService.transformTask(taskRelationList);
List<TaskNode> taskNodes = processService.transformTask(taskRelationList, Lists.newArrayList());
if (CollectionUtils.isEmpty(taskNodes)) {
logger.error("process node info is empty");
@ -1254,9 +1284,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp());
createProcessDefine(loginUser, result, taskRelationList, processDefinition);
createProcessDefine(loginUser, result, taskRelationList, processDefinition, Lists.newArrayList());
} else {
updateProcessDefine(loginUser, result, taskRelationList, processDefinition);
updateProcessDefine(loginUser, result, taskRelationList, processDefinition, Lists.newArrayList());
}
if (result.get(Constants.STATUS) != Status.SUCCESS) {
failedProcessList.add(processDefinition.getCode() + "[" + processDefinition.getName() + "]");

46
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
@ -401,6 +402,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
* @param loginUser login user
* @param projectCode project code
* @param taskRelationJson process task relation json
* @param taskDefinitionJson taskDefinitionJson
* @param processInstanceId process instance id
* @param scheduleTime schedule time
* @param syncDefine sync define
@ -413,7 +415,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Transactional
@Override
public Map<String, Object> updateProcessInstance(User loginUser, long projectCode, Integer processInstanceId, String taskRelationJson,
String scheduleTime, Boolean syncDefine, String globalParams,
String taskDefinitionJson, String scheduleTime, Boolean syncDefine, String globalParams,
String locations, int timeout, String tenantCode) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
@ -433,26 +435,42 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.getName(), processInstance.getState().toString(), "update");
return result;
}
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
//check workflow json is valid
result = processDefinitionService.checkProcessNodeList(taskRelationJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout);
if (Boolean.TRUE.equals(syncDefine)) {
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
if (taskDefinitionLogs.isEmpty()) {
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return result;
}
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return result;
}
}
if (!processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs)) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
return result;
}
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
//check workflow json is valid
result = processDefinitionService.checkProcessNodeList(taskRelationJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenant.getId());
processDefinition.setUpdateTime(new Date());
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, false);
if (insertVersion > 0) {
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList);
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
if (insertResult > 0) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);

83
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -111,85 +111,28 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return result;
}
int totalSuccessNumber = 0;
List<Long> totalSuccessCode = new ArrayList<>();
Date now = new Date();
List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
logger.error("task definition {} parameter invalid", taskDefinitionLog.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return result;
}
taskDefinitionLog.setProjectCode(projectCode);
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setOperator(loginUser.getId());
if (taskDefinitionLog.getCode() > 0 && taskDefinitionLog.getVersion() > 0) {
TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
if (definitionCodeAndVersion != null) {
if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
if (version == null || version == 0) {
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionLog.getCode());
return result;
}
taskDefinitionLog.setVersion(version + 1);
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
updateTaskDefinitionLogs.add(taskDefinitionLog);
totalSuccessCode.add(taskDefinitionLog.getCode());
}
continue;
}
}
taskDefinitionLog.setUserId(loginUser.getId());
taskDefinitionLog.setVersion(1);
taskDefinitionLog.setCreateTime(now);
if (taskDefinitionLog.getCode() == 0) {
long code;
try {
code = SnowFlakeUtils.getInstance().nextId();
taskDefinitionLog.setVersion(1);
taskDefinitionLog.setCode(code);
} catch (SnowFlakeException e) {
logger.error("Task code get error, ", e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return result;
}
}
totalSuccessCode.add(taskDefinitionLog.getCode());
newTaskDefinitionLogs.add(taskDefinitionLog);
totalSuccessNumber++;
}
for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
TaskDefinition task = taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode());
if (task == null) {
newTaskDefinitionLogs.add(taskDefinitionToUpdate);
} else {
int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate);
int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
if ((update & insert) != 1) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
return result;
}
}
}
if (!newTaskDefinitionLogs.isEmpty()) {
int insert = taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
int logInsert = taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
if ((logInsert & insert) == 0) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, taskDefinitionLog.getName());
if (taskDefinition != null) {
logger.error("task definition name {} already exists", taskDefinitionLog.getName());
putMsg(result, Status.TASK_DEFINITION_NAME_EXISTED, taskDefinitionLog.getName());
return result;
}
}
Map<String, Object> resData = new HashMap<>();
resData.put("total", totalSuccessNumber);
resData.put("code", totalSuccessCode);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, resData);
if (processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs)) {
Map<String, Object> resData = new HashMap<>();
resData.put("total", taskDefinitionLogs.size());
resData.put("code", StringUtils.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ","));
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, resData);
} else {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
}
return result;
}

20
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -116,6 +116,14 @@ public class ProcessInstanceServiceTest {
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
private String taskJson = "[{\"name\":\"shell1\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
+ "\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},"
+ "\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\","
+ "\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"},{\"name\":\"shell2\",\"description\":\"\","
+ "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 2\",\"conditionResult\":{\"successNode\""
+ ":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\","
+ "\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"}]";
@Test
public void testQueryProcessInstanceList() {
long projectCode = 1L;
@ -372,7 +380,7 @@ public class ProcessInstanceServiceTest {
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Map<String, Object> proejctAuthFailRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, "2020-02-21 00:00:00", true, "", "", 0, "");
shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, "");
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS));
//process instance null
@ -382,7 +390,7 @@ public class ProcessInstanceServiceTest {
when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
when(processService.findProcessInstanceDetailById(1)).thenReturn(null);
Map<String, Object> processInstanceNullRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, "2020-02-21 00:00:00", true, "", "", 0, "");
shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "");
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS));
//process instance not finish
@ -390,7 +398,7 @@ public class ProcessInstanceServiceTest {
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> processInstanceNotFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, "2020-02-21 00:00:00", true, "", "", 0, "");
shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "");
Assert.assertEquals(Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstanceNotFinishRes.get(Constants.STATUS));
//process instance finish
@ -410,15 +418,15 @@ public class ProcessInstanceServiceTest {
when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result);
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, "2020-02-21 00:00:00", true, "", "", 0, "root");
Assert.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR, processInstanceFinishRes.get(Constants.STATUS));
shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "root");
Assert.assertEquals(Status.CREATE_TASK_DEFINITION_ERROR, processInstanceFinishRes.get(Constants.STATUS));
//success
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> successRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, "2020-02-21 00:00:00", false, "", "", 0, "root");
shellJson, taskJson,"2020-02-21 00:00:00", false, "", "", 0, "root");
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}

17
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -95,9 +95,8 @@ public class TaskDefinitionServiceImplTest {
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
List<TaskDefinition> taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinition.class);
Mockito.when(taskDefinitionMapper.batchInsert(Mockito.anyList())).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.batchInsert(Mockito.anyList())).thenReturn(1);
List<TaskDefinitionLog> taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class);
Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions)).thenReturn(true);
Map<String, Object> relation = taskDefinitionService
.createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
@ -250,11 +249,23 @@ public class TaskDefinitionServiceImplTest {
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
Assert.assertFalse(taskDefinitionLogs.isEmpty());
String taskJson = "[{\"name\":\"shell1\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
+ "\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},"
+ "\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\","
+ "\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"},{\"name\":\"shell2\",\"description\":\"\","
+ "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 2\",\"conditionResult\":{\"successNode\""
+ ":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}},\"flag\":\"NORMAL\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\","
+ "\"failRetryTimes\":\"0\",\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":null,\"delayTime\":\"0\"}]";
taskDefinitionLogs = JSONUtils.toList(taskJson, TaskDefinitionLog.class);
Assert.assertFalse(taskDefinitionLogs.isEmpty());
String taskParams = "{\"resourceList\":[],\"localParams\":[{\"prop\":\"datetime\",\"direct\":\"IN\",\"type\":\"VARCHAR\","
+ "\"value\":\"${system.datetime}\"}],\"rawScript\":\"echo ${datetime}\",\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
+ "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":{}}";
ShellParameters parameters = JSONUtils.parseObject(taskParams, ShellParameters.class);
Assert.assertNotNull(parameters);
String params = "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";
ShellParameters parameters1 = JSONUtils.parseObject(params, ShellParameters.class);
Assert.assertNotNull(parameters1);
}
@Test

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java

@ -159,7 +159,6 @@ public class JSONUtils {
}
try {
CollectionType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, clazz);
return objectMapper.readValue(json, listType);
} catch (Exception e) {

33
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.Date;
@ -25,6 +26,8 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
/**
* process task relation
@ -86,6 +89,8 @@ public class ProcessTaskRelation {
/**
* condition parameters
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String conditionParams;
/**
@ -236,19 +241,19 @@ public class ProcessTaskRelation {
@Override
public String toString() {
return "ProcessTaskRelation{"
+ "id=" + id
+ ", name='" + name + '\''
+ ", processDefinitionVersion=" + processDefinitionVersion
+ ", projectCode=" + projectCode
+ ", processDefinitionCode=" + processDefinitionCode
+ ", preTaskCode=" + preTaskCode
+ ", preTaskVersion=" + preTaskVersion
+ ", postTaskCode=" + postTaskCode
+ ", postTaskVersion=" + postTaskVersion
+ ", conditionType=" + conditionType
+ ", conditionParams='" + conditionParams + '\''
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ '}';
+ "id=" + id
+ ", name='" + name + '\''
+ ", processDefinitionVersion=" + processDefinitionVersion
+ ", projectCode=" + projectCode
+ ", processDefinitionCode=" + processDefinitionCode
+ ", preTaskCode=" + preTaskCode
+ ", preTaskVersion=" + preTaskVersion
+ ", postTaskCode=" + postTaskCode
+ ", postTaskVersion=" + postTaskVersion
+ ", conditionType=" + conditionType
+ ", conditionParams='" + conditionParams + '\''
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ '}';
}
}

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -38,6 +38,8 @@ import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
/**
* task definition
@ -89,6 +91,8 @@ public class TaskDefinition {
/**
* user defined parameters
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String taskParams;
/**
@ -437,12 +441,6 @@ public class TaskDefinition {
&& Objects.equals(resourceIds, that.resourceIds);
}
@Override
public int hashCode() {
return Objects.hash(name, description, taskType, taskParams, flag, taskPriority, workerGroup, failRetryTimes,
failRetryInterval, timeoutFlag, timeoutNotifyStrategy, timeout, delayTime, resourceIds);
}
@Override
public String toString() {
return "TaskDefinition{"

88
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -57,6 +57,8 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
@ -123,6 +125,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.facebook.presto.jdbc.internal.guava.collect.Lists;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@ -2090,6 +2093,63 @@ public class ProcessService {
return StringUtils.join(resourceIds, ",");
}
public boolean saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs) {
Date now = new Date();
List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
taskDefinitionLog.setProjectCode(projectCode);
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setOperator(operator.getId());
taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
if (taskDefinitionLog.getCode() > 0 && taskDefinitionLog.getVersion() > 0) {
TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
if (definitionCodeAndVersion != null) {
if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
taskDefinitionLog.setVersion(version + 1);
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
updateTaskDefinitionLogs.add(taskDefinitionLog);
}
continue;
}
}
taskDefinitionLog.setUserId(operator.getId());
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setCreateTime(now);
if (taskDefinitionLog.getCode() == 0) {
try {
taskDefinitionLog.setCode(SnowFlakeUtils.getInstance().nextId());
} catch (SnowFlakeException e) {
logger.error("Task code get error, ", e);
return false;
}
}
newTaskDefinitionLogs.add(taskDefinitionLog);
}
for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
TaskDefinition task = taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode());
if (task == null) {
newTaskDefinitionLogs.add(taskDefinitionToUpdate);
} else {
int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate);
int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
if ((update & insert) != 1) {
return false;
}
}
}
if (!newTaskDefinitionLogs.isEmpty()) {
int insert = taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
int logInsert = taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
return (logInsert & insert) != 0;
}
return true;
}
/**
* save processDefinition (including create or update processDefinition)
*/
@ -2116,21 +2176,33 @@ public class ProcessService {
* save task relations
*/
public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion,
List<ProcessTaskRelationLog> taskRelationList) {
List<ProcessTaskRelationLog> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
taskDefinitionLogMap = taskDefinitionLogs.stream()
.collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
}
Date now = new Date();
taskRelationList.forEach(processTaskRelationLog -> {
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
if (taskDefinitionLogMap != null) {
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
if (taskDefinitionLog != null) {
processTaskRelationLog.setPreTaskVersion(taskDefinitionLog.getVersion());
}
processTaskRelationLog.setPostTaskVersion(taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode()).getVersion());
}
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLog.setOperator(operator.getId());
processTaskRelationLog.setOperateTime(now);
});
}
int result = processTaskRelationMapper.batchInsert(taskRelationList);
int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
return result & resultLog;
@ -2162,7 +2234,7 @@ public class ProcessService {
*/
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
List<TaskNode> taskNodeList = transformTask(processTaskRelations);
List<TaskNode> taskNodeList = transformTask(processTaskRelations, Lists.newArrayList());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations));
// Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
@ -2288,7 +2360,7 @@ public class ProcessService {
/**
* Use temporarily before refactoring taskNode
*/
public List<TaskNode> transformTask(List<ProcessTaskRelationLog> taskRelationList) {
public List<TaskNode> transformTask(List<ProcessTaskRelationLog> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs) {
Map<Long, List<Long>> taskCodeMap = new HashMap<>();
for (ProcessTaskRelationLog processTaskRelation : taskRelationList) {
taskCodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
@ -2301,7 +2373,9 @@ public class ProcessService {
return v;
});
}
List<TaskDefinitionLog> taskDefinitionLogs = genTaskDefineList(taskRelationList);
if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
taskDefinitionLogs = genTaskDefineList(taskRelationList);
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
List<TaskNode> taskNodeList = new ArrayList<>();
@ -2318,7 +2392,7 @@ public class ProcessService {
taskNode.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
Map<String, Object> taskParamsMap = taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
taskNode.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT));
taskNode.setConditionResult(JSONUtils.toJsonString(taskParamsMap.get(Constants.CONDITION_RESULT)));
taskNode.setDependence(JSONUtils.toJsonString(taskParamsMap.get(Constants.DEPENDENCE)));
taskParamsMap.remove(Constants.CONDITION_RESULT);
taskParamsMap.remove(Constants.DEPENDENCE);

Loading…
Cancel
Save