|
|
@ -47,14 +47,18 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; |
|
|
|
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; |
|
|
|
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessData; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessData; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.Project; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.Project; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.Tenant; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.Tenant; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.User; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.User; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; |
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; |
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; |
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; |
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; |
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; |
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; |
|
|
|
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; |
|
|
|
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; |
|
|
|
import org.apache.dolphinscheduler.dao.utils.DagHelper; |
|
|
|
import org.apache.dolphinscheduler.dao.utils.DagHelper; |
|
|
|
import org.apache.dolphinscheduler.service.process.ProcessService; |
|
|
|
import org.apache.dolphinscheduler.service.process.ProcessService; |
|
|
@ -93,6 +97,9 @@ public class ProcessInstanceService extends BaseService { |
|
|
|
|
|
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class); |
|
|
|
private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static final String TASK_TYPE = "taskType"; |
|
|
|
|
|
|
|
public static final String LOCAL_PARAMS_LIST = "localParamsList"; |
|
|
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
|
@Autowired |
|
|
|
ProjectMapper projectMapper; |
|
|
|
ProjectMapper projectMapper; |
|
|
|
|
|
|
|
|
|
|
@ -123,6 +130,11 @@ public class ProcessInstanceService extends BaseService { |
|
|
|
@Autowired |
|
|
|
@Autowired |
|
|
|
LoggerService loggerService; |
|
|
|
LoggerService loggerService; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
|
|
|
|
|
ProcessDefinitionLogMapper processDefinitionLogMapper; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
|
|
|
|
|
TaskDefinitionLogMapper taskDefinitionLogMapper; |
|
|
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
|
@Autowired |
|
|
|
UsersService usersService; |
|
|
|
UsersService usersService; |
|
|
@ -608,34 +620,47 @@ public class ProcessInstanceService extends BaseService { |
|
|
|
Map<String, String> timeParams = BusinessTimeUtils |
|
|
|
Map<String, String> timeParams = BusinessTimeUtils |
|
|
|
.getBusinessTime(processInstance.getCmdTypeIfComplement(), |
|
|
|
.getBusinessTime(processInstance.getCmdTypeIfComplement(), |
|
|
|
processInstance.getScheduleTime()); |
|
|
|
processInstance.getScheduleTime()); |
|
|
|
|
|
|
|
|
|
|
|
String workflowInstanceJson = processInstance.getProcessInstanceJson(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ProcessData workflowData = JSONUtils.parseObject(workflowInstanceJson, ProcessData.class); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String userDefinedParams = processInstance.getGlobalParams(); |
|
|
|
String userDefinedParams = processInstance.getGlobalParams(); |
|
|
|
|
|
|
|
|
|
|
|
// global params
|
|
|
|
// global params
|
|
|
|
List<Property> globalParams = new ArrayList<>(); |
|
|
|
List<Property> globalParams = new ArrayList<>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// global param string
|
|
|
|
|
|
|
|
String globalParamStr = ParameterUtils.convertParameterPlaceholders(JSONUtils.toJsonString(globalParams), timeParams); |
|
|
|
|
|
|
|
globalParams = JSONUtils.toList(globalParamStr, Property.class); |
|
|
|
|
|
|
|
for (Property property : globalParams) { |
|
|
|
|
|
|
|
timeParams.put(property.getProp(), property.getValue()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (userDefinedParams != null && userDefinedParams.length() > 0) { |
|
|
|
if (userDefinedParams != null && userDefinedParams.length() > 0) { |
|
|
|
globalParams = JSONUtils.toList(userDefinedParams, Property.class); |
|
|
|
globalParams = JSONUtils.toList(userDefinedParams, Property.class); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
List<TaskNode> taskNodeList = workflowData.getTasks(); |
|
|
|
Map<String, Map<String, Object>> localUserDefParams = getLocalParams(processInstance, timeParams); |
|
|
|
|
|
|
|
|
|
|
|
// global param string
|
|
|
|
Map<String, Object> resultMap = new HashMap<>(); |
|
|
|
String globalParamStr = JSONUtils.toJsonString(globalParams); |
|
|
|
|
|
|
|
globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams); |
|
|
|
resultMap.put(GLOBAL_PARAMS, globalParams); |
|
|
|
globalParams = JSONUtils.toList(globalParamStr, Property.class); |
|
|
|
resultMap.put(LOCAL_PARAMS, localUserDefParams); |
|
|
|
for (Property property : globalParams) { |
|
|
|
|
|
|
|
timeParams.put(property.getProp(), property.getValue()); |
|
|
|
result.put(DATA_LIST, resultMap); |
|
|
|
|
|
|
|
putMsg(result, Status.SUCCESS); |
|
|
|
|
|
|
|
return result; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// local params
|
|
|
|
/** |
|
|
|
|
|
|
|
* get local params |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* @param processInstance |
|
|
|
|
|
|
|
* @param timeParams |
|
|
|
|
|
|
|
* @return |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
private Map<String, Map<String, Object>> getLocalParams(ProcessInstance processInstance, Map<String, String> timeParams) { |
|
|
|
Map<String, Map<String, Object>> localUserDefParams = new HashMap<>(); |
|
|
|
Map<String, Map<String, Object>> localUserDefParams = new HashMap<>(); |
|
|
|
for (TaskNode taskNode : taskNodeList) { |
|
|
|
List<TaskInstance> taskInstanceList = taskInstanceMapper.findValidTaskListByProcessId(processInstance.getId(), Flag.YES); |
|
|
|
String parameter = taskNode.getParams(); |
|
|
|
for (TaskInstance taskInstance : taskInstanceList) { |
|
|
|
|
|
|
|
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( |
|
|
|
|
|
|
|
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); |
|
|
|
|
|
|
|
String parameter = taskDefinitionLog.getTaskParams(); |
|
|
|
Map<String, String> map = JSONUtils.toMap(parameter); |
|
|
|
Map<String, String> map = JSONUtils.toMap(parameter); |
|
|
|
String localParams = map.get(LOCAL_PARAMS); |
|
|
|
String localParams = map.get(LOCAL_PARAMS); |
|
|
|
if (localParams != null && !localParams.isEmpty()) { |
|
|
|
if (localParams != null && !localParams.isEmpty()) { |
|
|
@ -643,23 +668,15 @@ public class ProcessInstanceService extends BaseService { |
|
|
|
List<Property> localParamsList = JSONUtils.toList(localParams, Property.class); |
|
|
|
List<Property> localParamsList = JSONUtils.toList(localParams, Property.class); |
|
|
|
|
|
|
|
|
|
|
|
Map<String, Object> localParamsMap = new HashMap<>(); |
|
|
|
Map<String, Object> localParamsMap = new HashMap<>(); |
|
|
|
localParamsMap.put("taskType", taskNode.getType()); |
|
|
|
localParamsMap.put(TASK_TYPE, taskDefinitionLog.getTaskType()); |
|
|
|
localParamsMap.put("localParamsList", localParamsList); |
|
|
|
localParamsMap.put(LOCAL_PARAMS_LIST, localParamsList); |
|
|
|
if (CollectionUtils.isNotEmpty(localParamsList)) { |
|
|
|
if (CollectionUtils.isNotEmpty(localParamsList)) { |
|
|
|
localUserDefParams.put(taskNode.getName(), localParamsMap); |
|
|
|
localUserDefParams.put(taskDefinitionLog.getName(), localParamsMap); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return localUserDefParams; |
|
|
|
Map<String, Object> resultMap = new HashMap<>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
resultMap.put(GLOBAL_PARAMS, globalParams); |
|
|
|
|
|
|
|
resultMap.put(LOCAL_PARAMS, localUserDefParams); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result.put(DATA_LIST, resultMap); |
|
|
|
|
|
|
|
putMsg(result, Status.SUCCESS); |
|
|
|
|
|
|
|
return result; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
@ -678,9 +695,15 @@ public class ProcessInstanceService extends BaseService { |
|
|
|
throw new RuntimeException("workflow instance is null"); |
|
|
|
throw new RuntimeException("workflow instance is null"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
GanttDto ganttDto = new GanttDto(); |
|
|
|
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion( |
|
|
|
|
|
|
|
processInstance.getProcessDefinitionCode(), |
|
|
|
|
|
|
|
processInstance.getProcessDefinitionVersion() |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
ProcessDefinition processDefinition = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog), |
|
|
|
|
|
|
|
ProcessDefinition.class); |
|
|
|
|
|
|
|
|
|
|
|
DAG<String, TaskNode, TaskNodeRelation> dag = processInstance2DAG(processInstance); |
|
|
|
GanttDto ganttDto = new GanttDto(); |
|
|
|
|
|
|
|
DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition); |
|
|
|
//topological sort
|
|
|
|
//topological sort
|
|
|
|
List<String> nodeList = dag.topologicalSort(); |
|
|
|
List<String> nodeList = dag.topologicalSort(); |
|
|
|
|
|
|
|
|
|
|
@ -712,21 +735,6 @@ public class ProcessInstanceService extends BaseService { |
|
|
|
return result; |
|
|
|
return result; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* process instance to DAG |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* @param processInstance input process instance |
|
|
|
|
|
|
|
* @return process instance dag. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
private static DAG<String, TaskNode, TaskNodeRelation> processInstance2DAG(ProcessInstance processInstance) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String processDefinitionJson = processInstance.getProcessInstanceJson(); |
|
|
|
|
|
|
|
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); |
|
|
|
|
|
|
|
List<TaskNode> taskNodeList = processData.getTasks(); |
|
|
|
|
|
|
|
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); |
|
|
|
|
|
|
|
return DagHelper.buildDagGraph(processDag); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* query process instance by processDefinitionId and stateArray |
|
|
|
* query process instance by processDefinitionId and stateArray |
|
|
|
* |
|
|
|
* |
|
|
|